From 2dd79bbd2b6e235f028a0de4588f74081152863e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Tue, 14 Apr 2026 15:26:58 +0200 Subject: [PATCH 1/7] feat: migrate OutboxStore from Exposed to plain JDBC (KOJAK-64) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace Exposed ORM with plain JDBC in PostgresOutboxStore and MysqlOutboxStore. This eliminates the coupling to Exposed's transaction mechanism, allowing okapi to work transparently with any Spring PlatformTransactionManager — JPA/Hibernate, JDBC, jOOQ, MyBatis, etc. Key changes: - Add ConnectionProvider interface to okapi-core - Add SpringConnectionProvider using DataSourceUtils.getConnection() - Rewrite PostgresOutboxStore and MysqlOutboxStore to plain JDBC - Remove Exposed dependencies from okapi-postgres and okapi-mysql - Delete OutboxTable.kt from both modules - Add TransactionRunner to OutboxPurger (fixes connection leak) - Fix null parameter handling (setNull instead of setTimestamp(null)) - Parameterize status in removeDeliveredBefore SQL - Change spring-jdbc from compileOnly to implementation --- .../okapi/core/ConnectionProvider.kt | 17 ++ .../softwaremill/okapi/core/OutboxPurger.kt | 5 +- okapi-integration-tests/build.gradle.kts | 7 - .../test/concurrency/ConcurrentClaimTests.kt | 17 +- .../concurrency/MysqlConcurrentClaimTest.kt | 3 +- .../PostgresConcurrentClaimTest.kt | 3 +- .../okapi/test/e2e/HttpEndToEndTest.kt | 41 ++--- .../okapi/test/e2e/KafkaEndToEndTest.kt | 9 +- .../okapi/test/e2e/MysqlHttpEndToEndTest.kt | 27 ++- .../okapi/test/store/MysqlOutboxStoreTest.kt | 8 +- .../test/store/OutboxStoreContractTests.kt | 49 +++-- .../test/store/PostgresOutboxStoreTest.kt | 8 +- .../test/support/JdbcConnectionProvider.kt | 38 ++++ .../okapi/test/support/MysqlTestSupport.kt | 22 ++- .../okapi/test/support/PostgresTestSupport.kt | 22 ++- .../MultiDataSourceTransactionTest.kt | 11 +- okapi-mysql/build.gradle.kts | 14 +- .../okapi/mysql/MysqlOutboxStore.kt | 173 +++++++++-------- .../softwaremill/okapi/mysql/OutboxTable.kt | 31 ---- .../okapi/mysql/MysqlOutboxStoreTest.kt | 147 --------------- okapi-postgres/build.gradle.kts | 14 +- .../okapi/postgres/OutboxTable.kt | 24 --- .../okapi/postgres/PostgresOutboxStore.kt | 174 ++++++++++-------- okapi-spring-boot/build.gradle.kts | 5 +- .../springboot/OutboxAutoConfiguration.kt | 14 +- .../okapi/springboot/OutboxPurgerScheduler.kt | 3 + .../springboot/SpringConnectionProvider.kt | 18 ++ .../springboot/OutboxMysqlEndToEndTest.kt | 69 ++++--- 28 files changed, 459 insertions(+), 514 deletions(-) create mode 100644 okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ConnectionProvider.kt create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/JdbcConnectionProvider.kt delete mode 100644 okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/OutboxTable.kt delete mode 100644 okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt delete mode 100644 okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/OutboxTable.kt create mode 100644 okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/SpringConnectionProvider.kt diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ConnectionProvider.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ConnectionProvider.kt new file mode 100644 index 0000000..6586ffb --- /dev/null +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ConnectionProvider.kt @@ -0,0 +1,17 @@ +package com.softwaremill.okapi.core + +import java.sql.Connection + +/** + * Provides a JDBC [Connection] from the current transactional context. + * + * Implementations bridge okapi's [OutboxStore] with the caller's transaction mechanism: + * - `okapi-spring-boot`: uses `DataSourceUtils.getConnection()` — works with JPA, JDBC, jOOQ, MyBatis, Exposed + * - `okapi-exposed`: uses Exposed's `TransactionManager.current().connection` — for Ktor/standalone Exposed + * - Standalone: user-provided lambda wrapping a `DataSource` or `ThreadLocal` + * + * The returned connection is **borrowed** from the current transaction — the caller must NOT close it. + */ +fun interface ConnectionProvider { + fun getConnection(): Connection +} diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt index 3c72376..632451c 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt @@ -18,6 +18,7 @@ import java.util.concurrent.atomic.AtomicBoolean */ class OutboxPurger( private val outboxStore: OutboxStore, + private val transactionRunner: TransactionRunner? = null, private val config: OutboxPurgerConfig = OutboxPurgerConfig(), private val clock: Clock = Clock.systemUTC(), ) { @@ -58,7 +59,9 @@ class OutboxPurger( var totalDeleted = 0 var batches = 0 do { - val deleted = outboxStore.removeDeliveredBefore(cutoff, config.batchSize) + val deleted = transactionRunner?.runInTransaction { + outboxStore.removeDeliveredBefore(cutoff, config.batchSize) + } ?: outboxStore.removeDeliveredBefore(cutoff, config.batchSize) totalDeleted += deleted batches++ } while (deleted == config.batchSize && batches < MAX_BATCHES_PER_TICK) diff --git a/okapi-integration-tests/build.gradle.kts b/okapi-integration-tests/build.gradle.kts index 7860b89..1635690 100644 --- a/okapi-integration-tests/build.gradle.kts +++ b/okapi-integration-tests/build.gradle.kts @@ -25,13 +25,6 @@ dependencies { testImplementation(libs.postgresql) testImplementation(libs.mysql) - // Exposed ORM (for transaction blocks and DB queries in tests) - testImplementation(libs.exposedCore) - testImplementation(libs.exposedJdbc) - testImplementation(libs.exposedJson) - testImplementation(libs.exposedJavaTime) - testImplementation(libs.exposedSpringTransaction) - // Liquibase (schema migrations in tests) testImplementation(libs.liquibaseCore) diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/ConcurrentClaimTests.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/ConcurrentClaimTests.kt index c3a0d64..55f1fd6 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/ConcurrentClaimTests.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/ConcurrentClaimTests.kt @@ -9,13 +9,13 @@ import com.softwaremill.okapi.core.OutboxProcessor import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore import com.softwaremill.okapi.core.RetryPolicy +import com.softwaremill.okapi.test.support.JdbcConnectionProvider import com.softwaremill.okapi.test.support.RecordingMessageDeliverer import io.kotest.assertions.withClue import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.maps.shouldContain import io.kotest.matchers.shouldBe -import org.jetbrains.exposed.v1.jdbc.transactions.transaction import java.sql.Connection import java.time.Clock import java.time.Instant @@ -41,15 +41,18 @@ private fun createTestEntry(index: Int, now: Instant = Instant.parse("2024-01-01 fun FunSpec.concurrentClaimTests( dbName: String, + jdbcProvider: () -> JdbcConnectionProvider, storeFactory: () -> OutboxStore, startDb: () -> Unit, stopDb: () -> Unit, truncate: () -> Unit, ) { lateinit var store: OutboxStore + lateinit var jdbc: JdbcConnectionProvider beforeSpec { startDb() + jdbc = jdbcProvider() store = storeFactory() } @@ -63,7 +66,7 @@ fun FunSpec.concurrentClaimTests( test("[$dbName] concurrent claimPending with held locks produces disjoint sets") { // Insert 20 entries - val allIds = transaction { + val allIds = jdbc.withTransaction { (0 until 20).map { i -> val entry = createTestEntry(i) store.persist(entry) @@ -80,7 +83,7 @@ fun FunSpec.concurrentClaimTests( // next-key locks cause SKIP LOCKED to skip more rows than actually locked. val threadA = Thread.ofVirtual().name("processor-A").start { try { - transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) { + jdbc.withTransaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) { val claimed = store.claimPending(10) claimedByA.complete(claimed.map { it.outboxId }) lockAcquired.countDown() @@ -97,7 +100,7 @@ fun FunSpec.concurrentClaimTests( // Main thread: claim remaining entries (SKIP LOCKED should skip A's locked rows) val idsA = claimedByA.get(10, TimeUnit.SECONDS) - val idsB = transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) { + val idsB = jdbc.withTransaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) { store.claimPending(10) }.map { it.outboxId } @@ -123,7 +126,7 @@ fun FunSpec.concurrentClaimTests( val fixedClock = Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC) // Insert 50 entries - transaction { + jdbc.withTransaction { (0 until 50).forEach { i -> store.persist(createTestEntry(i)) } } @@ -137,7 +140,7 @@ fun FunSpec.concurrentClaimTests( CompletableFuture.supplyAsync( { barrier.await(10, TimeUnit.SECONDS) - transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) { + jdbc.withTransaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) { OutboxProcessor(store, entryProcessor).processNext(limit = 50) } }, @@ -156,7 +159,7 @@ fun FunSpec.concurrentClaimTests( } // Verify DB state - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } withClue("DB state after concurrent processing: $counts") { counts shouldContain (OutboxStatus.DELIVERED to 50L) counts shouldContain (OutboxStatus.PENDING to 0L) diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/MysqlConcurrentClaimTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/MysqlConcurrentClaimTest.kt index 7ac7287..f2cb4a5 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/MysqlConcurrentClaimTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/MysqlConcurrentClaimTest.kt @@ -12,7 +12,8 @@ class MysqlConcurrentClaimTest : FunSpec({ concurrentClaimTests( dbName = "mysql", - storeFactory = { MysqlOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, + jdbcProvider = { db.jdbc }, + storeFactory = { MysqlOutboxStore(db.jdbc, Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, startDb = { db.start() }, stopDb = { db.stop() }, truncate = { db.truncate() }, diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/PostgresConcurrentClaimTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/PostgresConcurrentClaimTest.kt index b4b1c96..19710bc 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/PostgresConcurrentClaimTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/PostgresConcurrentClaimTest.kt @@ -12,7 +12,8 @@ class PostgresConcurrentClaimTest : FunSpec({ concurrentClaimTests( dbName = "postgres", - storeFactory = { PostgresOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, + jdbcProvider = { db.jdbc }, + storeFactory = { PostgresOutboxStore(db.jdbc, Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, startDb = { db.start() }, stopDb = { db.stop() }, truncate = { db.truncate() }, diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/HttpEndToEndTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/HttpEndToEndTest.kt index d6832ba..81633e5 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/HttpEndToEndTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/HttpEndToEndTest.kt @@ -21,7 +21,6 @@ import com.softwaremill.okapi.postgres.PostgresOutboxStore import com.softwaremill.okapi.test.support.PostgresTestSupport import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.maps.shouldContain -import org.jetbrains.exposed.v1.jdbc.transactions.transaction import java.time.Clock class HttpEndToEndTest : FunSpec({ @@ -45,7 +44,7 @@ class HttpEndToEndTest : FunSpec({ fun buildPipeline(maxRetries: Int = 3): Triple { val clock = Clock.systemUTC() - val store = PostgresOutboxStore(clock) + val store = PostgresOutboxStore(db.jdbc, clock) val publisher = OutboxPublisher(store, clock) val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } val entryProcessor = OutboxEntryProcessor( @@ -70,15 +69,15 @@ class HttpEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(200)), ) - transaction { publisher.publish(OutboxMessage("order.created", payload), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", payload), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } wiremock.verify( postRequestedFor(urlEqualTo("/api/notify")) .withRequestBody(equalTo(payload)), ) - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.DELIVERED to 1L) } @@ -90,10 +89,10 @@ class HttpEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(500)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.PENDING to 1L) counts shouldContain (OutboxStatus.DELIVERED to 0L) } @@ -106,10 +105,10 @@ class HttpEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(400)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.FAILED to 1L) counts shouldContain (OutboxStatus.PENDING to 0L) } @@ -122,10 +121,10 @@ class HttpEndToEndTest : FunSpec({ .willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.PENDING to 1L) } @@ -133,13 +132,13 @@ class HttpEndToEndTest : FunSpec({ val (publisher, _, store) = buildPipeline() runCatching { - transaction { + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) error("Simulated business logic failure") } } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.PENDING to 0L) } @@ -151,19 +150,19 @@ class HttpEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(500)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } // First 3 processNext calls: retries 0->1, 1->2, 2->3 — stays PENDING repeat(3) { - transaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + db.jdbc.withTransaction { processor.processNext() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.PENDING to 1L) } // 4th processNext: retries==3, shouldRetry(3) returns false -> FAILED - transaction { processor.processNext() } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.FAILED to 1L) counts shouldContain (OutboxStatus.PENDING to 0L) } diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/KafkaEndToEndTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/KafkaEndToEndTest.kt index 02bee9f..3fa9c9f 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/KafkaEndToEndTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/KafkaEndToEndTest.kt @@ -15,7 +15,6 @@ import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.maps.shouldContain import io.kotest.matchers.shouldBe import org.apache.kafka.clients.producer.KafkaProducer -import org.jetbrains.exposed.v1.jdbc.transactions.transaction import java.time.Clock import java.time.Duration import java.util.UUID @@ -43,7 +42,7 @@ class KafkaEndToEndTest : FunSpec({ test("full pipeline: publish to outbox -> processNext -> message on Kafka topic") { val clock = Clock.systemUTC() - val store = PostgresOutboxStore(clock) + val store = PostgresOutboxStore(db.jdbc, clock) val publisher = OutboxPublisher(store, clock) val deliverer = KafkaMessageDeliverer(producer!!) val entryProcessor = OutboxEntryProcessor(deliverer, RetryPolicy(maxRetries = 3), clock) @@ -56,10 +55,10 @@ class KafkaEndToEndTest : FunSpec({ partitionKey = "user-1" } - transaction { publisher.publish(OutboxMessage("order.created", payload), info) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", payload), info) } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.DELIVERED to 1L) val consumer = kafka.createConsumer(groupId = "e2e-test-${UUID.randomUUID()}") diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/MysqlHttpEndToEndTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/MysqlHttpEndToEndTest.kt index 99d6d22..41c5056 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/MysqlHttpEndToEndTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/MysqlHttpEndToEndTest.kt @@ -21,7 +21,6 @@ import com.softwaremill.okapi.mysql.MysqlOutboxStore import com.softwaremill.okapi.test.support.MysqlTestSupport import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.maps.shouldContain -import org.jetbrains.exposed.v1.jdbc.transactions.transaction import java.time.Clock class MysqlHttpEndToEndTest : FunSpec({ @@ -45,7 +44,7 @@ class MysqlHttpEndToEndTest : FunSpec({ fun buildPipeline(): Triple { val clock = Clock.systemUTC() - val store = MysqlOutboxStore(clock) + val store = MysqlOutboxStore(db.jdbc, clock) val publisher = OutboxPublisher(store, clock) val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } val entryProcessor = OutboxEntryProcessor( @@ -70,15 +69,15 @@ class MysqlHttpEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(200)), ) - transaction { publisher.publish(OutboxMessage("order.created", payload), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", payload), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } wiremock.verify( postRequestedFor(urlEqualTo("/api/notify")) .withRequestBody(equalTo(payload)), ) - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.DELIVERED to 1L) } @@ -90,10 +89,10 @@ class MysqlHttpEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(500)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.PENDING to 1L) counts shouldContain (OutboxStatus.DELIVERED to 0L) } @@ -106,10 +105,10 @@ class MysqlHttpEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(400)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.FAILED to 1L) counts shouldContain (OutboxStatus.PENDING to 0L) } @@ -122,10 +121,10 @@ class MysqlHttpEndToEndTest : FunSpec({ .willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.PENDING to 1L) } }) diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/MysqlOutboxStoreTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/MysqlOutboxStoreTest.kt index c2d4c51..8576fd4 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/MysqlOutboxStoreTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/MysqlOutboxStoreTest.kt @@ -12,7 +12,13 @@ class MysqlOutboxStoreTest : FunSpec({ outboxStoreContractTests( dbName = "mysql", - storeFactory = { MysqlOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, + storeFactory = { + MysqlOutboxStore( + connectionProvider = db.jdbc, + clock = Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC), + ) + }, + jdbcProvider = { db.jdbc }, startDb = { db.start() }, stopDb = { db.stop() }, truncate = { db.truncate() }, diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt index 58a0260..04ce1a2 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt @@ -5,11 +5,11 @@ import com.softwaremill.okapi.core.OutboxEntry import com.softwaremill.okapi.core.OutboxMessage import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.test.support.JdbcConnectionProvider import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.maps.shouldContain import io.kotest.matchers.shouldBe -import org.jetbrains.exposed.v1.jdbc.transactions.transaction import java.time.Instant private class StubDeliveryInfo( @@ -33,14 +33,17 @@ private fun createTestEntry( fun FunSpec.outboxStoreContractTests( dbName: String, storeFactory: () -> OutboxStore, + jdbcProvider: () -> JdbcConnectionProvider, startDb: () -> Unit, stopDb: () -> Unit, truncate: () -> Unit, ) { lateinit var store: OutboxStore + lateinit var jdbc: JdbcConnectionProvider beforeSpec { startDb() + jdbc = jdbcProvider() store = storeFactory() } @@ -55,9 +58,9 @@ fun FunSpec.outboxStoreContractTests( test("[$dbName] persist and read back via claimPending") { val entry = createTestEntry() - transaction { store.persist(entry) } + jdbc.withTransaction { store.persist(entry) } - val claimed = transaction { store.claimPending(10) } + val claimed = jdbc.withTransaction { store.claimPending(10) } claimed shouldHaveSize 1 val found = claimed.first() @@ -75,18 +78,17 @@ fun FunSpec.outboxStoreContractTests( val t2 = Instant.parse("2024-01-02T00:00:00Z") val t3 = Instant.parse("2024-01-03T00:00:00Z") - // Insert in non-sequential order to verify ordering val e2 = createTestEntry(now = t2, messageType = "type.second") val e3 = createTestEntry(now = t3, messageType = "type.third") val e1 = createTestEntry(now = t1, messageType = "type.first") - transaction { + jdbc.withTransaction { store.persist(e2) store.persist(e3) store.persist(e1) } - val claimed = transaction { store.claimPending(10) } + val claimed = jdbc.withTransaction { store.claimPending(10) } claimed shouldHaveSize 3 claimed[0].messageType shouldBe "type.first" @@ -95,7 +97,7 @@ fun FunSpec.outboxStoreContractTests( } test("[$dbName] claimPending respects limit") { - transaction { + jdbc.withTransaction { repeat(5) { i -> val entry = createTestEntry( now = Instant.parse("2024-01-01T00:00:00Z").plusSeconds(i.toLong()), @@ -105,7 +107,7 @@ fun FunSpec.outboxStoreContractTests( } } - val claimed = transaction { store.claimPending(2) } + val claimed = jdbc.withTransaction { store.claimPending(2) } claimed shouldHaveSize 2 } @@ -120,19 +122,18 @@ fun FunSpec.outboxStoreContractTests( messageType = "type.delivered", ) - transaction { + jdbc.withTransaction { store.persist(pendingEntry) store.persist(toBeDelivered) } - // Claim the second entry and mark it delivered - transaction { + jdbc.withTransaction { val claimed = store.claimPending(10) val deliveredCandidate = claimed.first { it.outboxId == toBeDelivered.outboxId } store.updateAfterProcessing(deliveredCandidate.toDelivered(Instant.parse("2024-01-02T00:00:00Z"))) } - val claimed = transaction { store.claimPending(10) } + val claimed = jdbc.withTransaction { store.claimPending(10) } claimed shouldHaveSize 1 claimed.first().messageType shouldBe "type.pending" @@ -141,15 +142,15 @@ fun FunSpec.outboxStoreContractTests( test("[$dbName] updateAfterProcessing persists status change") { val entry = createTestEntry() - transaction { store.persist(entry) } + jdbc.withTransaction { store.persist(entry) } - transaction { + jdbc.withTransaction { val claimed = store.claimPending(10) val delivered = claimed.first().toDelivered(Instant.parse("2024-01-02T00:00:00Z")) store.updateAfterProcessing(delivered) } - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.DELIVERED to 1L) counts shouldContain (OutboxStatus.PENDING to 0L) @@ -165,13 +166,12 @@ fun FunSpec.outboxStoreContractTests( messageType = "type.recent", ) - transaction { + jdbc.withTransaction { store.persist(oldEntry) store.persist(recentEntry) } - // Mark both as delivered with different lastAttempt timestamps - transaction { + jdbc.withTransaction { val claimed = store.claimPending(10) val old = claimed.first { it.outboxId == oldEntry.outboxId } val recent = claimed.first { it.outboxId == recentEntry.outboxId } @@ -179,10 +179,9 @@ fun FunSpec.outboxStoreContractTests( store.updateAfterProcessing(recent.toDelivered(Instant.parse("2024-01-11T00:00:00Z"))) } - // Remove delivered before Jan 5 — should delete old (lastAttempt=Jan 2) but keep recent (lastAttempt=Jan 11) - transaction { store.removeDeliveredBefore(Instant.parse("2024-01-05T00:00:00Z"), limit = 100) } + jdbc.withTransaction { store.removeDeliveredBefore(Instant.parse("2024-01-05T00:00:00Z"), limit = 100) } - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.DELIVERED to 1L) } @@ -204,14 +203,14 @@ fun FunSpec.outboxStoreContractTests( messageType = "type.failed", ) - transaction { + jdbc.withTransaction { store.persist(pending1) store.persist(pending2) store.persist(toDeliver) store.persist(toFail) } - transaction { + jdbc.withTransaction { val claimed = store.claimPending(10) val deliverEntry = claimed.first { it.outboxId == toDeliver.outboxId } val failEntry = claimed.first { it.outboxId == toFail.outboxId } @@ -219,7 +218,7 @@ fun FunSpec.outboxStoreContractTests( store.updateAfterProcessing(failEntry.toFailed(Instant.parse("2024-01-02T00:00:00Z"), "some error")) } - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.PENDING to 2L) counts shouldContain (OutboxStatus.DELIVERED to 1L) @@ -227,7 +226,7 @@ fun FunSpec.outboxStoreContractTests( } test("[$dbName] claimPending returns empty when no PENDING entries") { - val claimed = transaction { store.claimPending(10) } + val claimed = jdbc.withTransaction { store.claimPending(10) } claimed shouldHaveSize 0 } diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/PostgresOutboxStoreTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/PostgresOutboxStoreTest.kt index 1c77bf1..0f2d051 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/PostgresOutboxStoreTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/PostgresOutboxStoreTest.kt @@ -12,7 +12,13 @@ class PostgresOutboxStoreTest : FunSpec({ outboxStoreContractTests( dbName = "postgres", - storeFactory = { PostgresOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, + storeFactory = { + PostgresOutboxStore( + connectionProvider = db.jdbc, + clock = Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC), + ) + }, + jdbcProvider = { db.jdbc }, startDb = { db.start() }, stopDb = { db.stop() }, truncate = { db.truncate() }, diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/JdbcConnectionProvider.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/JdbcConnectionProvider.kt new file mode 100644 index 0000000..eb7241c --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/JdbcConnectionProvider.kt @@ -0,0 +1,38 @@ +package com.softwaremill.okapi.test.support + +import com.softwaremill.okapi.core.ConnectionProvider +import java.sql.Connection +import javax.sql.DataSource + +/** + * Test helper that provides a [ConnectionProvider] backed by a [ThreadLocal] connection. + * Use [withTransaction] to bind a JDBC connection for the duration of a block. + */ +class JdbcConnectionProvider(private val dataSource: DataSource) : ConnectionProvider { + private val threadLocalConnection = ThreadLocal() + + override fun getConnection(): Connection = threadLocalConnection.get() + ?: throw IllegalStateException("No connection bound to current thread. Use withTransaction { } in tests.") + + fun withTransaction(block: () -> T): T = withTransaction(transactionIsolation = null, block) + + fun withTransaction(transactionIsolation: Int?, block: () -> T): T { + val conn = dataSource.connection + conn.autoCommit = false + if (transactionIsolation != null) { + conn.transactionIsolation = transactionIsolation + } + threadLocalConnection.set(conn) + return try { + val result = block() + conn.commit() + result + } catch (e: Exception) { + conn.rollback() + throw e + } finally { + threadLocalConnection.remove() + conn.close() + } + } +} diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt index 70944d8..e883eeb 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt @@ -1,25 +1,27 @@ package com.softwaremill.okapi.test.support +import com.mysql.cj.jdbc.MysqlDataSource import liquibase.Liquibase import liquibase.database.DatabaseFactory import liquibase.database.jvm.JdbcConnection import liquibase.resource.ClassLoaderResourceAccessor -import org.jetbrains.exposed.v1.jdbc.Database -import org.jetbrains.exposed.v1.jdbc.transactions.transaction import org.testcontainers.containers.MySQLContainer import java.sql.DriverManager +import javax.sql.DataSource class MysqlTestSupport { val container = MySQLContainer("mysql:8.0") + lateinit var dataSource: DataSource + lateinit var jdbc: JdbcConnectionProvider fun start() { container.start() - Database.connect( - url = container.jdbcUrl, - driver = container.driverClassName, - user = container.username, - password = container.password, - ) + dataSource = MysqlDataSource().apply { + setURL(container.jdbcUrl) + user = container.username + setPassword(container.password) + } + jdbc = JdbcConnectionProvider(dataSource) runLiquibase() } @@ -28,7 +30,9 @@ class MysqlTestSupport { } fun truncate() { - transaction { exec("DELETE FROM outbox") } + jdbc.withTransaction { + jdbc.getConnection().createStatement().use { it.execute("DELETE FROM outbox") } + } } private fun runLiquibase() { diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt index 3a54934..aab6a75 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt @@ -4,22 +4,24 @@ import liquibase.Liquibase import liquibase.database.DatabaseFactory import liquibase.database.jvm.JdbcConnection import liquibase.resource.ClassLoaderResourceAccessor -import org.jetbrains.exposed.v1.jdbc.Database -import org.jetbrains.exposed.v1.jdbc.transactions.transaction +import org.postgresql.ds.PGSimpleDataSource import org.testcontainers.containers.PostgreSQLContainer import java.sql.DriverManager +import javax.sql.DataSource class PostgresTestSupport { val container = PostgreSQLContainer("postgres:16") + lateinit var dataSource: DataSource + lateinit var jdbc: JdbcConnectionProvider fun start() { container.start() - Database.connect( - url = container.jdbcUrl, - driver = container.driverClassName, - user = container.username, - password = container.password, - ) + dataSource = PGSimpleDataSource().apply { + setURL(container.jdbcUrl) + user = container.username + password = container.password + } + jdbc = JdbcConnectionProvider(dataSource) runLiquibase() } @@ -28,7 +30,9 @@ class PostgresTestSupport { } fun truncate() { - transaction { exec("TRUNCATE TABLE outbox") } + jdbc.withTransaction { + jdbc.getConnection().createStatement().use { it.execute("TRUNCATE TABLE outbox") } + } } private fun runLiquibase() { diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt index 4f4fff6..aa75755 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt @@ -5,6 +5,7 @@ import com.softwaremill.okapi.core.OutboxMessage import com.softwaremill.okapi.core.OutboxPublisher import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.postgres.PostgresOutboxStore +import com.softwaremill.okapi.springboot.SpringConnectionProvider import com.softwaremill.okapi.springboot.SpringOutboxPublisher import io.kotest.assertions.throwables.shouldThrow import io.kotest.core.spec.style.FunSpec @@ -14,7 +15,6 @@ import liquibase.Liquibase import liquibase.database.DatabaseFactory import liquibase.database.jvm.JdbcConnection import liquibase.resource.ClassLoaderResourceAccessor -import org.jetbrains.exposed.v1.spring7.transaction.SpringTransactionManager import org.postgresql.ds.PGSimpleDataSource import org.springframework.jdbc.datasource.DataSourceTransactionManager import org.springframework.transaction.support.TransactionTemplate @@ -31,8 +31,7 @@ import javax.sql.DataSource * - **outboxContainer**: hosts the outbox table (Liquibase migration applied) * - **otherContainer**: a second database with no outbox table * - * The outbox DataSource uses [SpringTransactionManager] (Exposed-compatible), - * while the other DataSource uses plain [DataSourceTransactionManager]. + * Both DataSources use plain [DataSourceTransactionManager]. */ class MultiDataSourceTransactionTest : FunSpec({ @@ -74,16 +73,14 @@ class MultiDataSourceTransactionTest : FunSpec({ // Run Liquibase migration only on the outbox database runLiquibase(outboxContainer) - // SpringTransactionManager (Exposed) for the outbox DataSource — - // PostgresOutboxStore uses Exposed internally, so the transaction must be Exposed-compatible - val outboxTxManager = SpringTransactionManager(outboxDataSource) + val outboxTxManager = DataSourceTransactionManager(outboxDataSource) outboxTxTemplate = TransactionTemplate(outboxTxManager) // Plain DataSourceTransactionManager for the other DataSource val otherTxManager = DataSourceTransactionManager(otherDataSource) otherTxTemplate = TransactionTemplate(otherTxManager) - store = PostgresOutboxStore(clock) + store = PostgresOutboxStore(SpringConnectionProvider(outboxDataSource), clock) val corePublisher = OutboxPublisher(store, clock) publisher = SpringOutboxPublisher(delegate = corePublisher, dataSource = outboxDataSource) } diff --git a/okapi-mysql/build.gradle.kts b/okapi-mysql/build.gradle.kts index 1ba1d75..dd671e5 100644 --- a/okapi-mysql/build.gradle.kts +++ b/okapi-mysql/build.gradle.kts @@ -3,27 +3,15 @@ plugins { id("buildsrc.convention.publish") } -description = "MySQL outbox store using Exposed" +description = "MySQL outbox store using plain JDBC" dependencies { implementation(project(":okapi-core")) - compileOnly(libs.exposedCore) - compileOnly(libs.exposedJdbc) - compileOnly(libs.exposedJson) - compileOnly(libs.exposedJavaTime) - - implementation(libs.jacksonModuleKotlin) - implementation(libs.jacksonDatatypeJsr310) - compileOnly(libs.liquibaseCore) testImplementation(libs.kotestRunnerJunit5) testImplementation(libs.kotestAssertionsCore) testImplementation(libs.testcontainersMysql) testImplementation(libs.mysql) - testImplementation(libs.exposedCore) - testImplementation(libs.exposedJdbc) - testImplementation(libs.exposedJson) - testImplementation(libs.exposedJavaTime) } diff --git a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt index 424f938..fd2e824 100644 --- a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt +++ b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt @@ -1,39 +1,55 @@ package com.softwaremill.okapi.mysql +import com.softwaremill.okapi.core.ConnectionProvider import com.softwaremill.okapi.core.OutboxEntry import com.softwaremill.okapi.core.OutboxId import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore -import org.jetbrains.exposed.v1.core.IntegerColumnType -import org.jetbrains.exposed.v1.core.alias -import org.jetbrains.exposed.v1.core.count -import org.jetbrains.exposed.v1.core.inList -import org.jetbrains.exposed.v1.core.min -import org.jetbrains.exposed.v1.jdbc.select -import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager -import org.jetbrains.exposed.v1.jdbc.upsert import java.sql.ResultSet +import java.sql.Timestamp import java.time.Clock import java.time.Instant import java.util.UUID -/** MySQL [OutboxStore] implementation using Exposed. */ +/** MySQL [OutboxStore] implementation using plain JDBC. */ class MysqlOutboxStore( - private val clock: Clock, + private val connectionProvider: ConnectionProvider, + private val clock: Clock = Clock.systemUTC(), ) : OutboxStore { + override fun persist(entry: OutboxEntry): OutboxEntry { - OutboxTable.upsert { - it[id] = entry.outboxId - it[messageType] = entry.messageType - it[payload] = entry.payload - it[deliveryType] = entry.deliveryType - it[status] = entry.status.name - it[createdAt] = entry.createdAt - it[updatedAt] = entry.updatedAt - it[retries] = entry.retries - it[lastAttempt] = entry.lastAttempt - it[lastError] = entry.lastError - it[deliveryMetadata] = entry.deliveryMetadata + val sql = """ + INSERT INTO outbox (id, message_type, payload, delivery_type, status, created_at, updated_at, retries, last_attempt, last_error, delivery_metadata) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON DUPLICATE KEY UPDATE + status = VALUES(status), + updated_at = VALUES(updated_at), + retries = VALUES(retries), + last_attempt = VALUES(last_attempt), + last_error = VALUES(last_error), + delivery_metadata = VALUES(delivery_metadata) + """.trimIndent() + + connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.setString(1, entry.outboxId.raw.toString()) + stmt.setString(2, entry.messageType) + stmt.setString(3, entry.payload) + stmt.setString(4, entry.deliveryType) + stmt.setString(5, entry.status.name) + stmt.setTimestamp(6, Timestamp.from(entry.createdAt)) + stmt.setTimestamp(7, Timestamp.from(entry.updatedAt)) + stmt.setInt(8, entry.retries) + if (entry.lastAttempt != null) { + stmt.setTimestamp( + 9, + Timestamp.from(entry.lastAttempt), + ) + } else { + stmt.setNull(9, java.sql.Types.TIMESTAMP) + } + if (entry.lastError != null) stmt.setString(10, entry.lastError) else stmt.setNull(10, java.sql.Types.VARCHAR) + stmt.setString(11, entry.deliveryMetadata) + stmt.executeUpdate() } return entry } @@ -42,81 +58,90 @@ class MysqlOutboxStore( // FORCE INDEX ensures InnoDB walks the (status, created_at) index so // that FOR UPDATE SKIP LOCKED only row-locks the rows actually returned // by LIMIT, rather than every row matching the WHERE clause. - val nativeQuery = - "SELECT * FROM ${OutboxTable.tableName}" + - " FORCE INDEX (idx_outbox_status_created_at)" + - " WHERE ${OutboxTable.status.name} = '${OutboxStatus.PENDING}'" + - " ORDER BY ${OutboxTable.createdAt.name} ASC" + - " LIMIT $limit FOR UPDATE SKIP LOCKED" + val sql = """ + SELECT * FROM outbox + FORCE INDEX (idx_outbox_status_created_at) + WHERE status = ? + ORDER BY created_at ASC + LIMIT ? + FOR UPDATE SKIP LOCKED + """.trimIndent() - return TransactionManager.current().exec(nativeQuery) { rs -> - generateSequence { - if (rs.next()) mapFromResultSet(rs) else null - }.toList() - } ?: emptyList() + return connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.setString(1, OutboxStatus.PENDING.name) + stmt.setInt(2, limit) + stmt.executeQuery().use { rs -> + generateSequence { if (rs.next()) rs.toOutboxEntry() else null }.toList() + } + } } override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = persist(entry) override fun removeDeliveredBefore(time: Instant, limit: Int): Int { val sql = """ - DELETE FROM ${OutboxTable.tableName} WHERE ${OutboxTable.id.name} IN ( - SELECT ${OutboxTable.id.name} FROM ( - SELECT ${OutboxTable.id.name} FROM ${OutboxTable.tableName} - WHERE ${OutboxTable.status.name} = '${OutboxStatus.DELIVERED}' - AND ${OutboxTable.lastAttempt.name} < ? - ORDER BY ${OutboxTable.id.name} + DELETE FROM outbox WHERE id IN ( + SELECT id FROM ( + SELECT id FROM outbox + WHERE status = ? + AND last_attempt < ? + ORDER BY id LIMIT ? FOR UPDATE SKIP LOCKED ) AS batch ) """.trimIndent() - val statement = TransactionManager.current().connection.prepareStatement(sql, false) - statement.fillParameters( - listOf( - OutboxTable.lastAttempt.columnType to time, - IntegerColumnType() to limit, - ), - ) - return statement.executeUpdate() + return connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.setString(1, OutboxStatus.DELIVERED.name) + stmt.setTimestamp(2, Timestamp.from(time)) + stmt.setInt(3, limit) + stmt.executeUpdate() + } } override fun findOldestCreatedAt(statuses: Set): Map { val result = statuses.associateWith { clock.instant() }.toMutableMap() - val minAlias = OutboxTable.createdAt.min().alias("min_created_at") - OutboxTable - .select(OutboxTable.status, minAlias) - .where { OutboxTable.status inList statuses.map { status -> status.name } } - .groupBy(OutboxTable.status) - .forEach { row -> - val s = OutboxStatus.from(row[OutboxTable.status]) - result[s] = requireNotNull(row[minAlias]) + val placeholders = statuses.joinToString(",") { "?" } + val sql = "SELECT status, MIN(created_at) AS min_created_at FROM outbox WHERE status IN ($placeholders) GROUP BY status" + + connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + statuses.forEachIndexed { i, s -> stmt.setString(i + 1, s.name) } + stmt.executeQuery().use { rs -> + while (rs.next()) { + val s = OutboxStatus.from(rs.getString("status")) + result[s] = rs.getTimestamp("min_created_at").toInstant() + } } + } return result } override fun countByStatuses(): Map { - val countAlias = OutboxTable.status.count().alias("count") - val counts = - OutboxTable - .select(OutboxTable.status, countAlias) - .groupBy(OutboxTable.status) - .associate { row -> OutboxStatus.from(row[OutboxTable.status]) to row[countAlias] } - return OutboxStatus.entries.associateWith { status -> counts[status] ?: 0L } + val sql = "SELECT status, COUNT(*) AS count FROM outbox GROUP BY status" + val counts = mutableMapOf() + + connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.executeQuery().use { rs -> + while (rs.next()) { + counts[OutboxStatus.from(rs.getString("status"))] = rs.getLong("count") + } + } + } + return OutboxStatus.entries.associateWith { counts[it] ?: 0L } } - private fun mapFromResultSet(rs: ResultSet): OutboxEntry = OutboxEntry( - outboxId = OutboxId(UUID.fromString(rs.getString("id"))), - messageType = rs.getString("message_type"), - payload = rs.getString("payload"), - deliveryType = rs.getString("delivery_type"), - status = OutboxStatus.from(rs.getString("status")), - createdAt = rs.getTimestamp("created_at").toInstant(), - updatedAt = rs.getTimestamp("updated_at").toInstant(), - retries = rs.getInt("retries"), - lastAttempt = rs.getTimestamp("last_attempt")?.toInstant(), - lastError = rs.getString("last_error"), - deliveryMetadata = rs.getString("delivery_metadata"), + private fun ResultSet.toOutboxEntry(): OutboxEntry = OutboxEntry( + outboxId = OutboxId(UUID.fromString(getString("id"))), + messageType = getString("message_type"), + payload = getString("payload"), + deliveryType = getString("delivery_type"), + status = OutboxStatus.from(getString("status")), + createdAt = getTimestamp("created_at").toInstant(), + updatedAt = getTimestamp("updated_at").toInstant(), + retries = getInt("retries"), + lastAttempt = getTimestamp("last_attempt")?.toInstant(), + lastError = getString("last_error"), + deliveryMetadata = getString("delivery_metadata"), ) } diff --git a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/OutboxTable.kt b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/OutboxTable.kt deleted file mode 100644 index a525706..0000000 --- a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/OutboxTable.kt +++ /dev/null @@ -1,31 +0,0 @@ -package com.softwaremill.okapi.mysql - -import com.softwaremill.okapi.core.OutboxId -import com.softwaremill.okapi.core.OutboxStatus -import org.jetbrains.exposed.v1.core.Table -import org.jetbrains.exposed.v1.javatime.timestamp -import org.jetbrains.exposed.v1.json.json -import java.util.UUID - -internal object OutboxTable : Table("outbox") { - val id = varchar("id", 36).transform( - wrap = { str -> OutboxId(UUID.fromString(str)) }, - unwrap = { outboxId -> outboxId.raw.toString() }, - ) - val messageType = varchar("message_type", 255) - val payload = text("payload") - val deliveryType = varchar("delivery_type", 50) - val status = varchar("status", 50).default(OutboxStatus.PENDING.name) - val createdAt = timestamp("created_at") - val updatedAt = timestamp("updated_at") - val retries = integer("retries").default(0) - val lastAttempt = timestamp("last_attempt").nullable() - val lastError = text("last_error").nullable() - val deliveryMetadata = json("delivery_metadata", { it }, { it }) - - override val primaryKey = PrimaryKey(id) - - init { - index("idx_outbox_status_created_at", isUnique = false, status, createdAt) - } -} diff --git a/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt b/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt deleted file mode 100644 index cbaa5c3..0000000 --- a/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt +++ /dev/null @@ -1,147 +0,0 @@ -package com.softwaremill.okapi.mysql - -import com.softwaremill.okapi.core.OutboxEntry -import com.softwaremill.okapi.core.OutboxId -import com.softwaremill.okapi.core.OutboxStatus -import io.kotest.core.spec.style.BehaviorSpec -import io.kotest.matchers.collections.shouldHaveSize -import io.kotest.matchers.shouldBe -import org.jetbrains.exposed.v1.jdbc.Database -import org.jetbrains.exposed.v1.jdbc.SchemaUtils -import org.jetbrains.exposed.v1.jdbc.transactions.transaction -import org.testcontainers.containers.MySQLContainer -import java.time.Clock -import java.time.Instant - -class MysqlOutboxStoreTest : BehaviorSpec({ - val mysql = MySQLContainer("mysql:8.0").apply { start() } - - val db = Database.connect( - url = mysql.jdbcUrl, - driver = mysql.driverClassName, - user = mysql.username, - password = mysql.password, - ) - - val clock = Clock.systemUTC() - val store = MysqlOutboxStore(clock) - - beforeSpec { - transaction(db) { - SchemaUtils.create(OutboxTable) - } - } - - afterSpec { - mysql.stop() - } - - given("persist and claimPending") { - `when`("an entry is persisted") { - val entry = newEntry() - transaction(db) { store.persist(entry) } - - then("claimPending returns it") { - val claimed = transaction(db) { store.claimPending(10) } - claimed shouldHaveSize 1 - claimed.first().outboxId shouldBe entry.outboxId - claimed.first().status shouldBe OutboxStatus.PENDING - } - } - } - - given("updateAfterProcessing") { - `when`("entry is marked DELIVERED") { - val entry = newEntry() - transaction(db) { store.persist(entry) } - - val delivered = entry.copy(status = OutboxStatus.DELIVERED, lastAttempt = Instant.now(clock)) - transaction(db) { store.updateAfterProcessing(delivered) } - - then("claimPending no longer returns it") { - val claimed = transaction(db) { store.claimPending(10) } - claimed.none { it.outboxId == entry.outboxId } shouldBe true - } - } - } - - given("removeDeliveredBefore") { - `when`("called with a cutoff time") { - transaction(db) { exec("DELETE FROM outbox") } - val entry = newEntry() - val delivered = entry.copy( - status = OutboxStatus.DELIVERED, - lastAttempt = Instant.parse("2020-01-01T00:00:00Z"), - ) - transaction(db) { store.persist(delivered) } - transaction(db) { store.removeDeliveredBefore(Instant.parse("2025-01-01T00:00:00Z"), Int.MAX_VALUE) } - - then("old delivered entries are removed") { - val counts = transaction(db) { store.countByStatuses() } - counts[OutboxStatus.DELIVERED] shouldBe 0L - } - } - } - - given("removeDeliveredBefore with limit") { - `when`("limit is smaller than matching entries") { - transaction(db) { exec("DELETE FROM outbox") } - repeat(5) { - val entry = newEntry().copy( - status = OutboxStatus.DELIVERED, - lastAttempt = Instant.parse("2020-01-01T00:00:00Z"), - ) - transaction(db) { store.persist(entry) } - } - - val deleted = transaction(db) { - store.removeDeliveredBefore(Instant.parse("2025-01-01T00:00:00Z"), 3) - } - - then("only deletes up to limit") { - deleted shouldBe 3 - } - then("remaining entries still exist") { - val counts = transaction(db) { store.countByStatuses() } - counts[OutboxStatus.DELIVERED] shouldBe 2L - } - } - } - - given("countByStatuses") { - `when`("entries exist with different statuses") { - transaction(db) { - exec("DELETE FROM outbox") - store.persist(newEntry().copy(status = OutboxStatus.PENDING)) - store.persist(newEntry().copy(status = OutboxStatus.PENDING)) - store.persist( - newEntry().copy( - status = OutboxStatus.DELIVERED, - lastAttempt = Instant.now(clock), - ), - ) - } - - then("returns correct counts per status") { - val counts = transaction(db) { store.countByStatuses() } - counts[OutboxStatus.PENDING] shouldBe 2L - counts[OutboxStatus.DELIVERED] shouldBe 1L - counts[OutboxStatus.FAILED] shouldBe 0L - } - } - } -}) - -private fun newEntry() = OutboxEntry( - outboxId = OutboxId.new(), - messageType = "test.event", - payload = """{"key": "value"}""", - deliveryType = "http", - status = OutboxStatus.PENDING, - createdAt = Instant.now(), - updatedAt = Instant.now(), - retries = 0, - lastAttempt = null, - lastError = null, - deliveryMetadata = """{"type": "http", "url": "http://localhost"}""", -) diff --git a/okapi-postgres/build.gradle.kts b/okapi-postgres/build.gradle.kts index ac12d31..6185570 100644 --- a/okapi-postgres/build.gradle.kts +++ b/okapi-postgres/build.gradle.kts @@ -3,27 +3,15 @@ plugins { id("buildsrc.convention.publish") } -description = "PostgreSQL outbox store using Exposed" +description = "PostgreSQL outbox store using plain JDBC" dependencies { implementation(project(":okapi-core")) - compileOnly(libs.exposedCore) - compileOnly(libs.exposedJdbc) - compileOnly(libs.exposedJson) - compileOnly(libs.exposedJavaTime) - - implementation(libs.jacksonModuleKotlin) - implementation(libs.jacksonDatatypeJsr310) - compileOnly(libs.liquibaseCore) testImplementation(libs.kotestRunnerJunit5) testImplementation(libs.kotestAssertionsCore) testImplementation(libs.testcontainersPostgresql) testImplementation(libs.postgresql) - testImplementation(libs.exposedCore) - testImplementation(libs.exposedJdbc) - testImplementation(libs.exposedJson) - testImplementation(libs.exposedJavaTime) } diff --git a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/OutboxTable.kt b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/OutboxTable.kt deleted file mode 100644 index 4aeedc0..0000000 --- a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/OutboxTable.kt +++ /dev/null @@ -1,24 +0,0 @@ -package com.softwaremill.okapi.postgres - -import com.softwaremill.okapi.core.OutboxId -import com.softwaremill.okapi.core.OutboxStatus -import org.jetbrains.exposed.v1.core.Table -import org.jetbrains.exposed.v1.core.java.javaUUID -import org.jetbrains.exposed.v1.javatime.timestamp -import org.jetbrains.exposed.v1.json.json - -internal object OutboxTable : Table("outbox") { - val id = javaUUID("id").transform(wrap = { uuid -> OutboxId(uuid) }, unwrap = { outboxId -> outboxId.raw }) - val messageType = varchar("message_type", 255) - val payload = text("payload") - val deliveryType = varchar("delivery_type", 50) - val status = varchar("status", 50).default(OutboxStatus.PENDING.name) - val createdAt = timestamp("created_at") - val updatedAt = timestamp("updated_at") - val retries = integer("retries").default(0) - val lastAttempt = timestamp("last_attempt").nullable() - val lastError = text("last_error").nullable() - val deliveryMetadata = json("delivery_metadata", { it }, { it }) - - override val primaryKey = PrimaryKey(id) -} diff --git a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt index 68fa2a6..40d7ad2 100644 --- a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt +++ b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt @@ -1,113 +1,141 @@ package com.softwaremill.okapi.postgres +import com.softwaremill.okapi.core.ConnectionProvider import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxId import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore -import org.jetbrains.exposed.v1.core.IntegerColumnType -import org.jetbrains.exposed.v1.core.ResultRow -import org.jetbrains.exposed.v1.core.SortOrder -import org.jetbrains.exposed.v1.core.alias -import org.jetbrains.exposed.v1.core.count -import org.jetbrains.exposed.v1.core.eq -import org.jetbrains.exposed.v1.core.inList -import org.jetbrains.exposed.v1.core.min -import org.jetbrains.exposed.v1.core.vendors.ForUpdateOption -import org.jetbrains.exposed.v1.jdbc.select -import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager -import org.jetbrains.exposed.v1.jdbc.upsert +import java.sql.ResultSet +import java.sql.Timestamp import java.time.Clock import java.time.Instant +import java.util.UUID -/** PostgreSQL [OutboxStore] implementation using Exposed. */ +/** PostgreSQL [OutboxStore] implementation using plain JDBC. */ class PostgresOutboxStore( - private val clock: Clock, + private val connectionProvider: ConnectionProvider, + private val clock: Clock = Clock.systemUTC(), ) : OutboxStore { + override fun persist(entry: OutboxEntry): OutboxEntry { - OutboxTable.upsert { - it[id] = entry.outboxId - it[messageType] = entry.messageType - it[payload] = entry.payload - it[deliveryType] = entry.deliveryType - it[status] = entry.status.name - it[createdAt] = entry.createdAt - it[updatedAt] = entry.updatedAt - it[retries] = entry.retries - it[lastAttempt] = entry.lastAttempt - it[lastError] = entry.lastError - it[deliveryMetadata] = entry.deliveryMetadata + val sql = """ + INSERT INTO outbox (id, message_type, payload, delivery_type, status, created_at, updated_at, retries, last_attempt, last_error, delivery_metadata) + VALUES (?::uuid, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb) + ON CONFLICT (id) DO UPDATE SET + status = EXCLUDED.status, + updated_at = EXCLUDED.updated_at, + retries = EXCLUDED.retries, + last_attempt = EXCLUDED.last_attempt, + last_error = EXCLUDED.last_error, + delivery_metadata = EXCLUDED.delivery_metadata + """.trimIndent() + + connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.setString(1, entry.outboxId.raw.toString()) + stmt.setString(2, entry.messageType) + stmt.setString(3, entry.payload) + stmt.setString(4, entry.deliveryType) + stmt.setString(5, entry.status.name) + stmt.setTimestamp(6, Timestamp.from(entry.createdAt)) + stmt.setTimestamp(7, Timestamp.from(entry.updatedAt)) + stmt.setInt(8, entry.retries) + if (entry.lastAttempt != null) { + stmt.setTimestamp( + 9, + Timestamp.from(entry.lastAttempt), + ) + } else { + stmt.setNull(9, java.sql.Types.TIMESTAMP) + } + if (entry.lastError != null) stmt.setString(10, entry.lastError) else stmt.setNull(10, java.sql.Types.VARCHAR) + stmt.setString(11, entry.deliveryMetadata) + stmt.executeUpdate() } return entry } override fun claimPending(limit: Int): List { - return OutboxTable - .select(OutboxTable.columns) - .where { OutboxTable.status eq OutboxStatus.PENDING.name } - .orderBy(OutboxTable.createdAt to SortOrder.ASC) - .limit(limit) - .forUpdate(ForUpdateOption.PostgreSQL.ForUpdate(mode = ForUpdateOption.PostgreSQL.MODE.SKIP_LOCKED)) - .map { it.toOutboxEntry() } + val sql = """ + SELECT * FROM outbox + WHERE status = ? + ORDER BY created_at ASC + LIMIT ? + FOR UPDATE SKIP LOCKED + """.trimIndent() + + return connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.setString(1, OutboxStatus.PENDING.name) + stmt.setInt(2, limit) + stmt.executeQuery().use { rs -> + generateSequence { if (rs.next()) rs.toOutboxEntry() else null }.toList() + } + } } override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = persist(entry) override fun removeDeliveredBefore(time: Instant, limit: Int): Int { val sql = """ - DELETE FROM ${OutboxTable.tableName} WHERE ${OutboxTable.id.name} IN ( - SELECT ${OutboxTable.id.name} FROM ${OutboxTable.tableName} - WHERE ${OutboxTable.status.name} = '${OutboxStatus.DELIVERED}' - AND ${OutboxTable.lastAttempt.name} < ? - ORDER BY ${OutboxTable.id.name} + DELETE FROM outbox WHERE id IN ( + SELECT id FROM outbox + WHERE status = ? + AND last_attempt < ? + ORDER BY id LIMIT ? FOR UPDATE SKIP LOCKED ) """.trimIndent() - val statement = TransactionManager.current().connection.prepareStatement(sql, false) - statement.fillParameters( - listOf( - OutboxTable.lastAttempt.columnType to time, - IntegerColumnType() to limit, - ), - ) - return statement.executeUpdate() + return connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.setString(1, OutboxStatus.DELIVERED.name) + stmt.setTimestamp(2, Timestamp.from(time)) + stmt.setInt(3, limit) + stmt.executeUpdate() + } } override fun findOldestCreatedAt(statuses: Set): Map { val result = statuses.associateWith { clock.instant() }.toMutableMap() - val minAlias = OutboxTable.createdAt.min().alias("min_created_at") - OutboxTable - .select(OutboxTable.status, minAlias) - .where { OutboxTable.status inList statuses.map { status -> status.name } } - .groupBy(OutboxTable.status) - .forEach { row -> - val s = OutboxStatus.from(row[OutboxTable.status]) - result[s] = requireNotNull(row[minAlias]) + val placeholders = statuses.joinToString(",") { "?" } + val sql = "SELECT status, MIN(created_at) AS min_created_at FROM outbox WHERE status IN ($placeholders) GROUP BY status" + + connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + statuses.forEachIndexed { i, s -> stmt.setString(i + 1, s.name) } + stmt.executeQuery().use { rs -> + while (rs.next()) { + val s = OutboxStatus.from(rs.getString("status")) + result[s] = rs.getTimestamp("min_created_at").toInstant() + } } + } return result } override fun countByStatuses(): Map { - val countAlias = OutboxTable.status.count().alias("count") - val counts = - OutboxTable - .select(OutboxTable.status, countAlias) - .groupBy(OutboxTable.status) - .associate { row -> OutboxStatus.from(row[OutboxTable.status]) to row[countAlias] } - return OutboxStatus.entries.associateWith { status -> counts[status] ?: 0L } + val sql = "SELECT status, COUNT(*) AS count FROM outbox GROUP BY status" + val counts = mutableMapOf() + + connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.executeQuery().use { rs -> + while (rs.next()) { + counts[OutboxStatus.from(rs.getString("status"))] = rs.getLong("count") + } + } + } + return OutboxStatus.entries.associateWith { counts[it] ?: 0L } } - private fun ResultRow.toOutboxEntry(): OutboxEntry = OutboxEntry( - outboxId = this[OutboxTable.id], - messageType = this[OutboxTable.messageType], - payload = this[OutboxTable.payload], - deliveryType = this[OutboxTable.deliveryType], - status = OutboxStatus.from(this[OutboxTable.status]), - createdAt = this[OutboxTable.createdAt], - updatedAt = this[OutboxTable.updatedAt], - retries = this[OutboxTable.retries], - lastAttempt = this[OutboxTable.lastAttempt], - lastError = this[OutboxTable.lastError], - deliveryMetadata = this[OutboxTable.deliveryMetadata], + private fun ResultSet.toOutboxEntry(): OutboxEntry = OutboxEntry( + outboxId = OutboxId(UUID.fromString(getString("id"))), + messageType = getString("message_type"), + payload = getString("payload"), + deliveryType = getString("delivery_type"), + status = OutboxStatus.from(getString("status")), + createdAt = getTimestamp("created_at").toInstant(), + updatedAt = getTimestamp("updated_at").toInstant(), + retries = getInt("retries"), + lastAttempt = getTimestamp("last_attempt")?.toInstant(), + lastError = getString("last_error"), + deliveryMetadata = getString("delivery_metadata"), ) } diff --git a/okapi-spring-boot/build.gradle.kts b/okapi-spring-boot/build.gradle.kts index 395d4c2..2050d4b 100644 --- a/okapi-spring-boot/build.gradle.kts +++ b/okapi-spring-boot/build.gradle.kts @@ -10,6 +10,7 @@ dependencies { compileOnly(libs.springContext) compileOnly(libs.springTx) + implementation(libs.springJdbc) compileOnly(libs.springBootAutoconfigure) // Validation annotations for @ConfigurationProperties classes @@ -33,10 +34,6 @@ dependencies { testImplementation(project(":okapi-postgres")) testImplementation(project(":okapi-mysql")) testImplementation(project(":okapi-http")) - testImplementation(libs.exposedCore) - testImplementation(libs.exposedJdbc) - testImplementation(libs.exposedJson) - testImplementation(libs.exposedJavaTime) testImplementation(libs.liquibaseCore) testImplementation(libs.testcontainersPostgresql) testImplementation(libs.postgresql) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index 123d367..4cdb2e8 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -126,10 +126,12 @@ class OutboxAutoConfiguration( fun outboxPurgerScheduler( props: OutboxPurgerProperties, outboxStore: OutboxStore, + transactionManager: ObjectProvider, clock: ObjectProvider, ): OutboxPurgerScheduler { return OutboxPurgerScheduler( outboxStore = outboxStore, + transactionTemplate = transactionManager.getIfAvailable()?.let { TransactionTemplate(it) }, config = OutboxPurgerConfig( retention = props.retention, interval = props.interval, @@ -153,8 +155,10 @@ class OutboxAutoConfiguration( ) { @Bean @ConditionalOnMissingBean(OutboxStore::class) - fun outboxStore(clock: ObjectProvider): PostgresOutboxStore = - PostgresOutboxStore(clock = clock.getIfAvailable { Clock.systemUTC() }) + fun outboxStore(clock: ObjectProvider): PostgresOutboxStore = PostgresOutboxStore( + connectionProvider = SpringConnectionProvider(resolveDataSource(dataSources, primaryDataSource, okapiProperties)), + clock = clock.getIfAvailable { Clock.systemUTC() }, + ) @Bean("okapiPostgresLiquibase") @ConditionalOnClass(SpringLiquibase::class) @@ -176,8 +180,10 @@ class OutboxAutoConfiguration( ) { @Bean @ConditionalOnMissingBean(OutboxStore::class) - fun outboxStore(clock: ObjectProvider): MysqlOutboxStore = - MysqlOutboxStore(clock = clock.getIfAvailable { Clock.systemUTC() }) + fun outboxStore(clock: ObjectProvider): MysqlOutboxStore = MysqlOutboxStore( + connectionProvider = SpringConnectionProvider(resolveDataSource(dataSources, primaryDataSource, okapiProperties)), + clock = clock.getIfAvailable { Clock.systemUTC() }, + ) @Bean("okapiMysqlLiquibase") @ConditionalOnClass(SpringLiquibase::class) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt index bd62b4f..1251ed3 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt @@ -4,6 +4,7 @@ import com.softwaremill.okapi.core.OutboxPurger import com.softwaremill.okapi.core.OutboxPurgerConfig import com.softwaremill.okapi.core.OutboxStore import org.springframework.context.SmartLifecycle +import org.springframework.transaction.support.TransactionTemplate import java.time.Clock /** @@ -14,12 +15,14 @@ import java.time.Clock */ class OutboxPurgerScheduler( outboxStore: OutboxStore, + transactionTemplate: TransactionTemplate? = null, config: OutboxPurgerConfig = OutboxPurgerConfig(), clock: Clock = Clock.systemUTC(), ) : SmartLifecycle { private val purger = OutboxPurger( outboxStore = outboxStore, + transactionRunner = transactionTemplate?.let { SpringTransactionRunner(it) }, config = config, clock = clock, ) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/SpringConnectionProvider.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/SpringConnectionProvider.kt new file mode 100644 index 0000000..6e921dc --- /dev/null +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/SpringConnectionProvider.kt @@ -0,0 +1,18 @@ +package com.softwaremill.okapi.springboot + +import com.softwaremill.okapi.core.ConnectionProvider +import org.springframework.jdbc.datasource.DataSourceUtils +import java.sql.Connection +import javax.sql.DataSource + +/** + * Spring-aware [ConnectionProvider] that retrieves the JDBC connection + * bound to the current Spring-managed transaction. + * + * Uses [DataSourceUtils.getConnection] — the standard Spring mechanism + * that works transparently with any [org.springframework.transaction.PlatformTransactionManager]: + * JPA, JDBC, jOOQ, MyBatis, Exposed, etc. + */ +class SpringConnectionProvider(private val dataSource: DataSource) : ConnectionProvider { + override fun getConnection(): Connection = DataSourceUtils.getConnection(dataSource) +} diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxMysqlEndToEndTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxMysqlEndToEndTest.kt index 18153ba..e7ae83c 100644 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxMysqlEndToEndTest.kt +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxMysqlEndToEndTest.kt @@ -6,6 +6,8 @@ import com.github.tomakehurst.wiremock.client.WireMock.post import com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig +import com.mysql.cj.jdbc.MysqlDataSource +import com.softwaremill.okapi.core.ConnectionProvider import com.softwaremill.okapi.core.OutboxEntryProcessor import com.softwaremill.okapi.core.OutboxMessage import com.softwaremill.okapi.core.OutboxProcessor @@ -22,8 +24,6 @@ import liquibase.Liquibase import liquibase.database.DatabaseFactory import liquibase.database.jvm.JdbcConnection import liquibase.resource.ClassLoaderResourceAccessor -import org.jetbrains.exposed.v1.jdbc.Database -import org.jetbrains.exposed.v1.jdbc.transactions.transaction import org.testcontainers.containers.MySQLContainer import java.sql.DriverManager import java.time.Clock @@ -36,17 +36,18 @@ class OutboxMysqlEndToEndTest : lateinit var store: MysqlOutboxStore lateinit var publisher: OutboxPublisher lateinit var processor: OutboxProcessor + lateinit var jdbc: TestJdbcConnectionProvider beforeSpec { mysql.start() wiremock.start() - Database.connect( - url = mysql.jdbcUrl, - driver = mysql.driverClassName, - user = mysql.username, - password = mysql.password, - ) + val dataSource = MysqlDataSource().apply { + setURL(mysql.jdbcUrl) + user = mysql.username + setPassword(mysql.password) + } + jdbc = TestJdbcConnectionProvider(dataSource) val connection = DriverManager.getConnection(mysql.jdbcUrl, mysql.username, mysql.password) val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(connection)) @@ -55,7 +56,7 @@ class OutboxMysqlEndToEndTest : connection.close() val clock = Clock.systemUTC() - store = MysqlOutboxStore(clock) + store = MysqlOutboxStore(jdbc, clock) publisher = OutboxPublisher(store, clock) val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } @@ -71,7 +72,7 @@ class OutboxMysqlEndToEndTest : beforeEach { wiremock.resetAll() - transaction { exec("DELETE FROM outbox") } + jdbc.withTransaction { jdbc.getConnection().createStatement().use { it.execute("DELETE FROM outbox") } } } given("a message published within a transaction") { @@ -81,7 +82,7 @@ class OutboxMysqlEndToEndTest : .willReturn(aResponse().withStatus(200)), ) - transaction { + jdbc.withTransaction { publisher.publish( OutboxMessage("order.created", """{"orderId":"abc-123"}"""), httpDeliveryInfo { @@ -91,10 +92,10 @@ class OutboxMysqlEndToEndTest : ) } - transaction { processor.processNext() } + jdbc.withTransaction { processor.processNext() } val requests = wiremock.findAll(postRequestedFor(urlEqualTo("/api/notify"))) - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } then("WireMock receives exactly one POST request") { requests.size shouldBe 1 @@ -113,7 +114,7 @@ class OutboxMysqlEndToEndTest : .willReturn(aResponse().withStatus(500).withBody("Internal Server Error")), ) - transaction { + jdbc.withTransaction { publisher.publish( OutboxMessage("order.created", """{"orderId":"xyz-456"}"""), httpDeliveryInfo { @@ -123,9 +124,9 @@ class OutboxMysqlEndToEndTest : ) } - transaction { processor.processNext() } + jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } then("entry stays PENDING (retriable failure, retries remaining)") { counts[OutboxStatus.PENDING] shouldBe 1L @@ -141,7 +142,7 @@ class OutboxMysqlEndToEndTest : .willReturn(aResponse().withStatus(400).withBody("Bad Request")), ) - transaction { + jdbc.withTransaction { publisher.publish( OutboxMessage("order.created", """{"orderId":"err-789"}"""), httpDeliveryInfo { @@ -151,9 +152,9 @@ class OutboxMysqlEndToEndTest : ) } - transaction { processor.processNext() } + jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } then("entry is immediately FAILED (permanent failure)") { counts[OutboxStatus.FAILED] shouldBe 1L @@ -174,7 +175,7 @@ class OutboxMysqlEndToEndTest : ), ) - transaction { + jdbc.withTransaction { publisher.publish( OutboxMessage("order.created", """{"orderId":"net-000"}"""), httpDeliveryInfo { @@ -184,9 +185,9 @@ class OutboxMysqlEndToEndTest : ) } - transaction { processor.processNext() } + jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } then("entry stays PENDING (retriable network failure)") { counts[OutboxStatus.PENDING] shouldBe 1L @@ -194,3 +195,27 @@ class OutboxMysqlEndToEndTest : } } }) + +private class TestJdbcConnectionProvider(private val dataSource: javax.sql.DataSource) : ConnectionProvider { + private val threadLocalConnection = ThreadLocal() + + override fun getConnection(): java.sql.Connection = threadLocalConnection.get() + ?: throw IllegalStateException("No connection bound to current thread. Use withTransaction { } in tests.") + + fun withTransaction(block: () -> T): T { + val conn = dataSource.connection + conn.autoCommit = false + threadLocalConnection.set(conn) + return try { + val result = block() + conn.commit() + result + } catch (e: Exception) { + conn.rollback() + throw e + } finally { + threadLocalConnection.remove() + conn.close() + } + } +} From 4c8ee9dda3966fc8f98f1d0780e40b59de907362 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 15 Apr 2026 15:51:45 +0200 Subject: [PATCH 2/7] docs: update README to reflect JDBC migration (remove Exposed references) --- README.md | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index f76cb9f..602d153 100644 --- a/README.md +++ b/README.md @@ -78,8 +78,8 @@ springOutboxPublisher.publish( **Using MySQL instead of PostgreSQL?** Replace `okapi-postgres` with `okapi-mysql` in your dependencies — no code changes needed. -> **Note:** `okapi-postgres` and `okapi-mysql` require Exposed ORM dependencies in your project. -> Spring and Kafka versions are not forced by okapi — you control them. +> **Note:** Spring and Kafka versions are not forced by okapi — you control them. +> Okapi uses plain JDBC internally — it works with any `PlatformTransactionManager` (JPA, JDBC, jOOQ, Exposed, etc.). ## How It Works @@ -179,9 +179,9 @@ graph BT | Module | Purpose | |--------|---------| -| `okapi-core` | Transport/storage-agnostic orchestration, scheduling, retry policy | -| `okapi-postgres` | PostgreSQL storage via Exposed ORM (`FOR UPDATE SKIP LOCKED`) | -| `okapi-mysql` | MySQL 8+ storage via Exposed ORM | +| `okapi-core` | Transport/storage-agnostic orchestration, scheduling, retry policy, `ConnectionProvider` interface | +| `okapi-postgres` | PostgreSQL storage via plain JDBC (`FOR UPDATE SKIP LOCKED`) | +| `okapi-mysql` | MySQL 8+ storage via plain JDBC | | `okapi-http` | HTTP webhook delivery (JDK HttpClient) | | `okapi-kafka` | Kafka topic publishing | | `okapi-micrometer` | Micrometer metrics (counters, timers, gauges) | @@ -195,7 +195,6 @@ graph BT | Java | 21+ | Required | | Spring Boot | 3.5.x, 4.0.x | `okapi-spring-boot` module | | Kafka Clients | 3.9.x, 4.x | `okapi-kafka` — you provide `kafka-clients` | -| Exposed | 1.x | `okapi-postgres`, `okapi-mysql` — you provide Exposed | ## Build From f4860b22fb7b26ca72d9a3cf938e38b169e19f3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 15 Apr 2026 15:53:38 +0200 Subject: [PATCH 3/7] docs: restore Exposed in compatibility table as compatible (not required) --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 602d153..09f65db 100644 --- a/README.md +++ b/README.md @@ -195,6 +195,7 @@ graph BT | Java | 21+ | Required | | Spring Boot | 3.5.x, 4.0.x | `okapi-spring-boot` module | | Kafka Clients | 3.9.x, 4.x | `okapi-kafka` — you provide `kafka-clients` | +| Exposed | 1.x | Compatible — okapi doesn't depend on Exposed, but works alongside it via `ConnectionProvider` | ## Build From 1f564b5074fcb8da9585f63cafb9bb2bacad5cd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 15 Apr 2026 16:05:49 +0200 Subject: [PATCH 4/7] docs: clarify Exposed as optional in compatibility table --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 09f65db..e5f31dc 100644 --- a/README.md +++ b/README.md @@ -195,7 +195,7 @@ graph BT | Java | 21+ | Required | | Spring Boot | 3.5.x, 4.0.x | `okapi-spring-boot` module | | Kafka Clients | 3.9.x, 4.x | `okapi-kafka` — you provide `kafka-clients` | -| Exposed | 1.x | Compatible — okapi doesn't depend on Exposed, but works alongside it via `ConnectionProvider` | +| Exposed | 1.x | Optional — `ExposedTransactionContextValidator` for Ktor/standalone apps (will move to `okapi-exposed` module) | ## Build From c5bbbbc6a4a8ea8a559a9abd493c981d92ff695d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 15 Apr 2026 16:13:34 +0200 Subject: [PATCH 5/7] feat: extract okapi-exposed module from okapi-core (KOJAK-40) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move ExposedTransactionContextValidator out of okapi-core into a dedicated okapi-exposed module. Add ExposedConnectionProvider and ExposedTransactionRunner for Ktor/standalone Exposed apps. okapi-core is now free of Exposed dependencies — only plain JDK types. --- README.md | 4 +++- okapi-bom/build.gradle.kts | 1 + okapi-core/build.gradle.kts | 3 --- .../okapi/core/TransactionContextValidator.kt | 5 ++--- okapi-exposed/build.gradle.kts | 15 ++++++++++++++ .../exposed/ExposedConnectionProvider.kt | 20 +++++++++++++++++++ .../ExposedTransactionContextValidator.kt | 3 ++- .../okapi/exposed/ExposedTransactionRunner.kt | 20 +++++++++++++++++++ .../ExposedTransactionContextValidatorTest.kt | 2 +- settings.gradle.kts | 1 + 10 files changed, 65 insertions(+), 9 deletions(-) create mode 100644 okapi-exposed/build.gradle.kts create mode 100644 okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt rename {okapi-core/src/main/kotlin/com/softwaremill/okapi/core => okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed}/ExposedTransactionContextValidator.kt (92%) create mode 100644 okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionRunner.kt rename {okapi-core/src/test/kotlin/com/softwaremill/okapi/core => okapi-exposed/src/test/kotlin/com/softwaremill/okapi/exposed}/ExposedTransactionContextValidatorTest.kt (98%) diff --git a/README.md b/README.md index e5f31dc..597e3e1 100644 --- a/README.md +++ b/README.md @@ -167,6 +167,7 @@ graph BT HTTP[okapi-http] --> CORE KAFKA[okapi-kafka] --> CORE MICRO[okapi-micrometer] --> CORE + EXP[okapi-exposed] --> CORE SPRING[okapi-spring-boot] --> CORE SPRING -.->|compileOnly| PG SPRING -.->|compileOnly| MY @@ -180,6 +181,7 @@ graph BT | Module | Purpose | |--------|---------| | `okapi-core` | Transport/storage-agnostic orchestration, scheduling, retry policy, `ConnectionProvider` interface | +| `okapi-exposed` | Exposed ORM integration — `ExposedConnectionProvider`, `ExposedTransactionRunner`, `ExposedTransactionContextValidator` | | `okapi-postgres` | PostgreSQL storage via plain JDBC (`FOR UPDATE SKIP LOCKED`) | | `okapi-mysql` | MySQL 8+ storage via plain JDBC | | `okapi-http` | HTTP webhook delivery (JDK HttpClient) | @@ -195,7 +197,7 @@ graph BT | Java | 21+ | Required | | Spring Boot | 3.5.x, 4.0.x | `okapi-spring-boot` module | | Kafka Clients | 3.9.x, 4.x | `okapi-kafka` — you provide `kafka-clients` | -| Exposed | 1.x | Optional — `ExposedTransactionContextValidator` for Ktor/standalone apps (will move to `okapi-exposed` module) | +| Exposed | 1.x | `okapi-exposed` module — for Ktor/standalone apps | ## Build diff --git a/okapi-bom/build.gradle.kts b/okapi-bom/build.gradle.kts index 03c6076..d31e2f5 100644 --- a/okapi-bom/build.gradle.kts +++ b/okapi-bom/build.gradle.kts @@ -8,6 +8,7 @@ description = "BOM for consistent versioning of Okapi modules" dependencies { constraints { api(project(":okapi-core")) + api(project(":okapi-exposed")) api(project(":okapi-postgres")) api(project(":okapi-mysql")) api(project(":okapi-http")) diff --git a/okapi-core/build.gradle.kts b/okapi-core/build.gradle.kts index 691f8a8..a2fee38 100644 --- a/okapi-core/build.gradle.kts +++ b/okapi-core/build.gradle.kts @@ -7,10 +7,7 @@ description = "Core outbox abstractions and processing engine" dependencies { implementation(libs.slf4jApi) - compileOnly(libs.exposedJdbc) testImplementation(libs.kotestRunnerJunit5) testImplementation(libs.kotestAssertionsCore) - testImplementation(libs.exposedJdbc) - testImplementation(libs.h2) testRuntimeOnly(libs.slf4jSimple) } diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionContextValidator.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionContextValidator.kt index bcf51a0..3b30582 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionContextValidator.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionContextValidator.kt @@ -4,9 +4,8 @@ package com.softwaremill.okapi.core * Checks whether the current execution context is inside an active read-write transaction. * * Framework-specific modules provide implementations: - * - `okapi-spring`: [SpringTransactionContextValidator][com.softwaremill.okapi.springboot.SpringTransactionContextValidator] - * — checks via `TransactionSynchronizationManager` - * - `okapi-core`: [ExposedTransactionContextValidator] — checks via Exposed's `TransactionManager.currentOrNull()` + * - `okapi-spring-boot`: `SpringTransactionContextValidator` — checks via `TransactionSynchronizationManager` + * - `okapi-exposed`: `ExposedTransactionContextValidator` — checks via Exposed's `TransactionManager.currentOrNull()` * - Standalone: no-op (always returns true) */ interface TransactionContextValidator { diff --git a/okapi-exposed/build.gradle.kts b/okapi-exposed/build.gradle.kts new file mode 100644 index 0000000..033d656 --- /dev/null +++ b/okapi-exposed/build.gradle.kts @@ -0,0 +1,15 @@ +plugins { + id("buildsrc.convention.kotlin-jvm") + id("buildsrc.convention.publish") +} + +description = "Exposed ORM integration — ConnectionProvider, TransactionRunner, TransactionContextValidator" + +dependencies { + api(project(":okapi-core")) + implementation(libs.exposedJdbc) + + testImplementation(libs.kotestRunnerJunit5) + testImplementation(libs.kotestAssertionsCore) + testImplementation(libs.h2) +} diff --git a/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt new file mode 100644 index 0000000..fe96b58 --- /dev/null +++ b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt @@ -0,0 +1,20 @@ +package com.softwaremill.okapi.exposed + +import com.softwaremill.okapi.core.ConnectionProvider +import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager +import java.sql.Connection + +/** + * Exposed implementation of [ConnectionProvider]. + * + * Retrieves the JDBC [Connection] from the current Exposed transaction. + * Use this when your application manages transactions via Exposed's + * `transaction(database) { }` blocks (e.g., Ktor + Exposed apps). + * + * The returned connection is **borrowed** from Exposed's active transaction — + * the caller must NOT close it. + */ +class ExposedConnectionProvider : ConnectionProvider { + override fun getConnection(): Connection = + TransactionManager.current().connection.connection as Connection +} diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ExposedTransactionContextValidator.kt b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionContextValidator.kt similarity index 92% rename from okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ExposedTransactionContextValidator.kt rename to okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionContextValidator.kt index 685e5fc..5875443 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ExposedTransactionContextValidator.kt +++ b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionContextValidator.kt @@ -1,5 +1,6 @@ -package com.softwaremill.okapi.core +package com.softwaremill.okapi.exposed +import com.softwaremill.okapi.core.TransactionContextValidator import org.jetbrains.exposed.v1.jdbc.Database import org.jetbrains.exposed.v1.jdbc.transactions.currentOrNull import org.jetbrains.exposed.v1.jdbc.transactions.transactionManager diff --git a/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionRunner.kt b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionRunner.kt new file mode 100644 index 0000000..ca800c6 --- /dev/null +++ b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionRunner.kt @@ -0,0 +1,20 @@ +package com.softwaremill.okapi.exposed + +import com.softwaremill.okapi.core.TransactionRunner +import org.jetbrains.exposed.v1.jdbc.Database +import org.jetbrains.exposed.v1.jdbc.transactions.transaction + +/** + * Exposed implementation of [TransactionRunner]. + * + * Wraps the block in Exposed's `transaction(database) { }`. + * Used by the outbox scheduler/processor when running outside of + * an existing transactional context (e.g., background processing thread). + * + * @param database The [Database] instance where the outbox table resides. + */ +class ExposedTransactionRunner( + private val database: Database, +) : TransactionRunner { + override fun runInTransaction(block: () -> T): T = transaction(database) { block() } +} diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/ExposedTransactionContextValidatorTest.kt b/okapi-exposed/src/test/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionContextValidatorTest.kt similarity index 98% rename from okapi-core/src/test/kotlin/com/softwaremill/okapi/core/ExposedTransactionContextValidatorTest.kt rename to okapi-exposed/src/test/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionContextValidatorTest.kt index 2ae30d8..f1b4c10 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/ExposedTransactionContextValidatorTest.kt +++ b/okapi-exposed/src/test/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionContextValidatorTest.kt @@ -1,4 +1,4 @@ -package com.softwaremill.okapi.core +package com.softwaremill.okapi.exposed import io.kotest.core.spec.style.BehaviorSpec import io.kotest.matchers.shouldBe diff --git a/settings.gradle.kts b/settings.gradle.kts index 832da0b..c1b49fe 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -10,6 +10,7 @@ plugins { } include("okapi-core") +include("okapi-exposed") include("okapi-postgres") include("okapi-mysql") include("okapi-http") From 68f36522930ecc5838d0b2945823a6fb1c0cdfc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 15 Apr 2026 20:06:43 +0200 Subject: [PATCH 6/7] fix: ktlint formatting in ExposedConnectionProvider --- .../softwaremill/okapi/exposed/ExposedConnectionProvider.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt index fe96b58..939904a 100644 --- a/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt +++ b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt @@ -15,6 +15,5 @@ import java.sql.Connection * the caller must NOT close it. */ class ExposedConnectionProvider : ConnectionProvider { - override fun getConnection(): Connection = - TransactionManager.current().connection.connection as Connection + override fun getConnection(): Connection = TransactionManager.current().connection.connection as Connection } From 9470e4d38c97f9922e005aebb2bc747fa48936c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Fri, 24 Apr 2026 06:20:15 +0200 Subject: [PATCH 7/7] fix: prevent connection leak in SpringConnectionProvider (KOJAK-64) ConnectionProvider.getConnection(): Connection returned a JDBC connection with an implicit "do not close" contract that only held inside an active Spring transaction. Outside one, DataSourceUtils.getConnection fetches a fresh pool connection that the caller is contractually required to release via DataSourceUtils.releaseConnection - and the bare-getter shape left no hook to do so. Every store call made outside a Spring transaction (processor/purger tick without a PlatformTransactionManager, actuator-style read calls, etc.) leaked a pooled connection. Migrate ConnectionProvider to an execute-around API: fun withConnection(block: (Connection) -> T): T The provider now owns both acquire and release. SpringConnectionProvider pairs DataSourceUtils.getConnection with DataSourceUtils.releaseConnection in a try/finally - a no-op when the connection is transaction-bound, a pool-return otherwise. ExposedConnectionProvider reads from the active Exposed TransactionManager (no cleanup - Exposed owns lifecycle). Test JdbcConnectionProvider keeps the ThreadLocal-backed scope. PostgresOutboxStore and MysqlOutboxStore were rewritten to nest their PreparedStatement.use { } blocks inside withConnection { }, making the connection scope explicit on each call site. OutboxAutoConfiguration KDoc previously claimed "if PlatformTransactionManager is absent, each store call runs in its own transaction" - true only at the JDBC auto-commit level, misleading about the concurrency semantics. Rewrote the note to spell out what auto-commit means here and to recommend configuring a PlatformTransactionManager for multi-instance deployments. Dependency scope: okapi-spring-boot now has production use of DataSourceUtils from spring-jdbc, so the spring-jdbc dependency moves from implementation to compileOnly to match the rest of the Spring framework dependencies in this module. Every okapi-spring-boot consumer already has spring-jdbc on their classpath via spring-boot-starter-jdbc or spring-boot-starter-data-jpa, so compileOnly avoids forcing a specific version transitively. Coverage: * SpringConnectionProviderTest - unit (H2 + TransactionSynchronizationManager): tx-bound reuse, outside-tx release, exception-safety, repeat. * ConnectionLeakProofTest - integration (Postgres Testcontainer): wraps the real DataSource with a counting proxy, asserts opened == closed across read-only and full-lifecycle store calls, and asserts the effects (claim returns the persisted entry, DELIVERED count transitions through 1 and 0 around the purge). * MysqlConnectionLeakProofTest - integration (MySQL Testcontainer): same assertions against the MySQL driver, whose pool-return semantics differ from pgjdbc. CountingDataSource was extracted to okapi-integration-tests/support so the Postgres and MySQL leak-proof tests share the same invariant check. Both the opened and closed counters increment only after the wrapped delegate operation succeeds; if wrapping a freshly opened connection fails, the delegate is closed before rethrowing so no physical connection escapes. The runLiquibase helpers in both leak-proof tests use DriverManager.getConnection(...).use { } so a Liquibase setup failure cannot leak a connection in the very tests that prove absence of leaks. ObservabilityEndToEndTest (added on main after this branch was cut, KOJAK-44) was written against the pre-migration Exposed-based PostgresOutboxStore API; updated here to use the JDBC ConnectionProvider via PostgresTestSupport.jdbc.withTransaction { } so it keeps passing after the JDBC migration. Addresses review feedback on PR #26. --- .../okapi/core/ConnectionProvider.kt | 20 ++-- .../exposed/ExposedConnectionProvider.kt | 17 ++- .../test/e2e/ObservabilityEndToEndTest.kt | 35 +++--- .../okapi/test/support/CountingDataSource.kt | 40 +++++++ .../test/support/JdbcConnectionProvider.kt | 10 +- .../okapi/test/support/MysqlTestSupport.kt | 4 +- .../okapi/test/support/PostgresTestSupport.kt | 4 +- .../transaction/ConnectionLeakProofTest.kt | 110 ++++++++++++++++++ .../MysqlConnectionLeakProofTest.kt | 109 +++++++++++++++++ .../okapi/mysql/MysqlOutboxStore.kt | 88 +++++++------- .../okapi/postgres/PostgresOutboxStore.kt | 88 +++++++------- okapi-spring-boot/build.gradle.kts | 3 +- .../springboot/OutboxAutoConfiguration.kt | 5 +- .../springboot/SpringConnectionProvider.kt | 19 ++- .../springboot/OutboxMysqlEndToEndTest.kt | 11 +- .../SpringConnectionProviderTest.kt | 106 +++++++++++++++++ 16 files changed, 543 insertions(+), 126 deletions(-) create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/CountingDataSource.kt create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/ConnectionLeakProofTest.kt create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MysqlConnectionLeakProofTest.kt create mode 100644 okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/SpringConnectionProviderTest.kt diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ConnectionProvider.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ConnectionProvider.kt index 6586ffb..d3e6d65 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ConnectionProvider.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ConnectionProvider.kt @@ -3,15 +3,19 @@ package com.softwaremill.okapi.core import java.sql.Connection /** - * Provides a JDBC [Connection] from the current transactional context. + * Supplies a JDBC [Connection] to a user block, binding its lifetime to the block's scope. * - * Implementations bridge okapi's [OutboxStore] with the caller's transaction mechanism: - * - `okapi-spring-boot`: uses `DataSourceUtils.getConnection()` — works with JPA, JDBC, jOOQ, MyBatis, Exposed - * - `okapi-exposed`: uses Exposed's `TransactionManager.current().connection` — for Ktor/standalone Exposed - * - Standalone: user-provided lambda wrapping a `DataSource` or `ThreadLocal` + * Implementations bridge okapi's [OutboxStore] with the caller's transaction mechanism + * (Spring, Exposed, standalone). The connection passed to [block] is **borrowed** — + * callers must never close it. Disposal is handled either by the provider itself + * (e.g. Spring's `DataSourceUtils.releaseConnection` after the block returns or throws) + * or by an outer scope that supplies the connection (e.g. an Exposed `transaction { }` + * block, a test's thread-local binding). * - * The returned connection is **borrowed** from the current transaction — the caller must NOT close it. + * Some implementations require an active transaction on the calling thread + * (e.g. `okapi-exposed` throws if `TransactionManager.current()` is unset); + * others work both inside and outside of one (`okapi-spring-boot`). */ -fun interface ConnectionProvider { - fun getConnection(): Connection +interface ConnectionProvider { + fun withConnection(block: (Connection) -> T): T } diff --git a/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt index 939904a..0c5419b 100644 --- a/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt +++ b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt @@ -7,13 +7,18 @@ import java.sql.Connection /** * Exposed implementation of [ConnectionProvider]. * - * Retrieves the JDBC [Connection] from the current Exposed transaction. - * Use this when your application manages transactions via Exposed's - * `transaction(database) { }` blocks (e.g., Ktor + Exposed apps). + * Reads the JDBC [Connection] from Exposed's active `TransactionManager.current()` and + * passes it to the caller's block. Exposed owns the connection's lifecycle — it commits + * or rolls back, and returns the connection to the pool when the enclosing + * `transaction(database) { }` block completes — so this provider performs no cleanup. * - * The returned connection is **borrowed** from Exposed's active transaction — - * the caller must NOT close it. + * Use when your application manages transactions via Exposed (e.g. Ktor + Exposed apps). + * Must be called from within an active Exposed transaction; otherwise + * `TransactionManager.current()` throws. */ class ExposedConnectionProvider : ConnectionProvider { - override fun getConnection(): Connection = TransactionManager.current().connection.connection as Connection + override fun withConnection(block: (Connection) -> T): T { + val connection = TransactionManager.current().connection.connection as Connection + return block(connection) + } } diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt index a11b891..2945040 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt @@ -22,7 +22,6 @@ import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.doubles.shouldBeGreaterThanOrEqual import io.kotest.matchers.shouldBe import io.micrometer.core.instrument.simple.SimpleMeterRegistry -import org.jetbrains.exposed.v1.jdbc.transactions.transaction import java.time.Clock import java.util.concurrent.TimeUnit @@ -31,8 +30,8 @@ class ObservabilityEndToEndTest : FunSpec({ val wiremock = WireMockServer(wireMockConfig().dynamicPort()) val clock = Clock.systemUTC() - val exposedTransactionRunner = object : TransactionRunner { - override fun runInTransaction(block: () -> T): T = transaction { block() } + val jdbcTransactionRunner = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = db.jdbc.withTransaction { block() } } beforeSpec { @@ -57,10 +56,10 @@ class ObservabilityEndToEndTest : FunSpec({ test("full pipeline: publish, deliver, verify Micrometer counters and gauges") { val registry = SimpleMeterRegistry() - val store = PostgresOutboxStore(clock) + val store = PostgresOutboxStore(db.jdbc, clock) val publisher = OutboxPublisher(store, clock) val listener = MicrometerOutboxListener(registry) - val metrics = MicrometerOutboxMetrics(store, registry, transactionRunner = exposedTransactionRunner, clock = clock) + val metrics = MicrometerOutboxMetrics(store, registry, transactionRunner = jdbcTransactionRunner, clock = clock) val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } val entryProcessor = OutboxEntryProcessor(HttpMessageDeliverer(urlResolver), RetryPolicy(maxRetries = 3), clock) @@ -82,10 +81,10 @@ class ObservabilityEndToEndTest : FunSpec({ ) // Publish 1 message - transaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-1"}"""), deliveryInfo()) } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-1"}"""), deliveryInfo()) } // First processNext: HTTP 500 → RetryScheduled - transaction { processor.processNext() } + db.jdbc.withTransaction { processor.processNext() } registry.counter("okapi.entries.retry.scheduled").count() shouldBe 1.0 registry.counter("okapi.entries.delivered").count() shouldBe 0.0 @@ -97,7 +96,7 @@ class ObservabilityEndToEndTest : FunSpec({ registry.find("okapi.entries.count").tag("status", "pending").gauge()!!.value() shouldBe 1.0 // Second processNext: HTTP 200 → Delivered - transaction { processor.processNext() } + db.jdbc.withTransaction { processor.processNext() } registry.counter("okapi.entries.delivered").count() shouldBe 1.0 registry.counter("okapi.entries.retry.scheduled").count() shouldBe 1.0 // still 1 from before @@ -111,10 +110,10 @@ class ObservabilityEndToEndTest : FunSpec({ test("permanent failure: HTTP 400 → Failed counter incremented, gauge reflects FAILED") { val registry = SimpleMeterRegistry() - val store = PostgresOutboxStore(clock) + val store = PostgresOutboxStore(db.jdbc, clock) val publisher = OutboxPublisher(store, clock) val listener = MicrometerOutboxListener(registry) - val metrics = MicrometerOutboxMetrics(store, registry, transactionRunner = exposedTransactionRunner, clock = clock) + val metrics = MicrometerOutboxMetrics(store, registry, transactionRunner = jdbcTransactionRunner, clock = clock) val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } val entryProcessor = OutboxEntryProcessor(HttpMessageDeliverer(urlResolver), RetryPolicy(maxRetries = 3), clock) @@ -125,8 +124,8 @@ class ObservabilityEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(400)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-2"}"""), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-2"}"""), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } registry.counter("okapi.entries.failed").count() shouldBe 1.0 registry.counter("okapi.entries.delivered").count() shouldBe 0.0 @@ -137,7 +136,7 @@ class ObservabilityEndToEndTest : FunSpec({ test("batch duration timer records realistic delivery time") { val registry = SimpleMeterRegistry() - val store = PostgresOutboxStore(clock) + val store = PostgresOutboxStore(db.jdbc, clock) val publisher = OutboxPublisher(store, clock) val listener = MicrometerOutboxListener(registry) @@ -150,8 +149,8 @@ class ObservabilityEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(200).withFixedDelay(50)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-3"}"""), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-3"}"""), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } val timer = registry.timer("okapi.batch.duration") timer.count() shouldBe 1 @@ -160,12 +159,12 @@ class ObservabilityEndToEndTest : FunSpec({ test("lag gauge reflects real time difference for pending entries") { val registry = SimpleMeterRegistry() - val store = PostgresOutboxStore(clock) + val store = PostgresOutboxStore(db.jdbc, clock) val publisher = OutboxPublisher(store, clock) - val metrics = MicrometerOutboxMetrics(store, registry, transactionRunner = exposedTransactionRunner, clock = clock) + val metrics = MicrometerOutboxMetrics(store, registry, transactionRunner = jdbcTransactionRunner, clock = clock) // Publish but don't process — entry stays PENDING - transaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-4"}"""), deliveryInfo()) } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-4"}"""), deliveryInfo()) } // Small sleep to create measurable lag Thread.sleep(100) diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/CountingDataSource.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/CountingDataSource.kt new file mode 100644 index 0000000..1c884ff --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/CountingDataSource.kt @@ -0,0 +1,40 @@ +package com.softwaremill.okapi.test.support + +import java.sql.Connection +import java.util.concurrent.atomic.AtomicInteger +import javax.sql.DataSource + +/** + * Counting [DataSource] wrapper used to assert that every physical connection borrowed + * from the pool is released. Both [opened] and [closed] increment only after the wrapped + * delegate operation succeeds — if either `delegate.connection` or `delegate.close()` + * throws, the counters stay consistent and the leak-proof assertion reflects reality. + * On a failure to wrap a freshly opened connection, the delegate is closed before + * rethrowing so no physical connection escapes. + */ +class CountingDataSource(val delegate: DataSource) : DataSource by delegate { + val opened = AtomicInteger(0) + val closed = AtomicInteger(0) + + override fun getConnection(): Connection { + val connection = delegate.connection + try { + val wrapped = CountingConnection(connection, closed) + opened.incrementAndGet() + return wrapped + } catch (t: Throwable) { + connection.close() + throw t + } + } + + private class CountingConnection( + private val delegate: Connection, + private val closed: AtomicInteger, + ) : Connection by delegate { + override fun close() { + delegate.close() + closed.incrementAndGet() + } + } +} diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/JdbcConnectionProvider.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/JdbcConnectionProvider.kt index eb7241c..85256e1 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/JdbcConnectionProvider.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/JdbcConnectionProvider.kt @@ -6,13 +6,17 @@ import javax.sql.DataSource /** * Test helper that provides a [ConnectionProvider] backed by a [ThreadLocal] connection. - * Use [withTransaction] to bind a JDBC connection for the duration of a block. + * Use [withTransaction] to bind a JDBC connection for the duration of a block; + * that outer scope owns commit/rollback and close. */ class JdbcConnectionProvider(private val dataSource: DataSource) : ConnectionProvider { private val threadLocalConnection = ThreadLocal() - override fun getConnection(): Connection = threadLocalConnection.get() - ?: throw IllegalStateException("No connection bound to current thread. Use withTransaction { } in tests.") + override fun withConnection(block: (Connection) -> T): T { + val connection = threadLocalConnection.get() + ?: throw IllegalStateException("No connection bound to current thread. Use withTransaction { } in tests.") + return block(connection) + } fun withTransaction(block: () -> T): T = withTransaction(transactionIsolation = null, block) diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt index e883eeb..4d16282 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt @@ -31,7 +31,9 @@ class MysqlTestSupport { fun truncate() { jdbc.withTransaction { - jdbc.getConnection().createStatement().use { it.execute("DELETE FROM outbox") } + jdbc.withConnection { conn -> + conn.createStatement().use { it.execute("DELETE FROM outbox") } + } } } diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt index aab6a75..04d3cf6 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt @@ -31,7 +31,9 @@ class PostgresTestSupport { fun truncate() { jdbc.withTransaction { - jdbc.getConnection().createStatement().use { it.execute("TRUNCATE TABLE outbox") } + jdbc.withConnection { conn -> + conn.createStatement().use { it.execute("TRUNCATE TABLE outbox") } + } } } diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/ConnectionLeakProofTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/ConnectionLeakProofTest.kt new file mode 100644 index 0000000..2c76dcb --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/ConnectionLeakProofTest.kt @@ -0,0 +1,110 @@ +package com.softwaremill.okapi.test.transaction + +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxId +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.postgres.PostgresOutboxStore +import com.softwaremill.okapi.springboot.SpringConnectionProvider +import com.softwaremill.okapi.test.support.CountingDataSource +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import liquibase.Liquibase +import liquibase.database.DatabaseFactory +import liquibase.database.jvm.JdbcConnection +import liquibase.resource.ClassLoaderResourceAccessor +import org.postgresql.ds.PGSimpleDataSource +import org.testcontainers.containers.PostgreSQLContainer +import java.sql.DriverManager +import java.time.Clock + +/** + * Proves that [SpringConnectionProvider] releases every JDBC connection it borrows from + * the pool when store methods are called outside a Spring transaction — the exact path + * that previously leaked because the old `getConnection(): Connection` contract had no + * release hook. Every physical `Connection.close()` is counted via a wrapping DataSource, + * and the assertion `opened == closed` must hold at the end of each test. + */ +class ConnectionLeakProofTest : FunSpec({ + + val container = PostgreSQLContainer("postgres:16") + lateinit var counter: CountingDataSource + lateinit var store: PostgresOutboxStore + val clock: Clock = Clock.systemUTC() + + beforeSpec { + container.start() + val raw = PGSimpleDataSource().apply { + setURL(container.jdbcUrl) + user = container.username + password = container.password + } + runLiquibase(container) + counter = CountingDataSource(raw) + store = PostgresOutboxStore(SpringConnectionProvider(counter), clock) + } + + afterSpec { + container.stop() + } + + beforeEach { + counter.delegate.connection.use { conn -> + conn.createStatement().use { it.execute("TRUNCATE TABLE outbox") } + } + counter.opened.set(0) + counter.closed.set(0) + } + + test("read-only store methods release every borrowed connection") { + val iterations = 25 + val methodsPerIteration = 3 + repeat(iterations) { + store.countByStatuses() + store.findOldestCreatedAt(setOf(OutboxStatus.PENDING, OutboxStatus.DELIVERED)) + store.claimPending(10) + } + + counter.opened.get() shouldBe counter.closed.get() + counter.opened.get() shouldBe iterations * methodsPerIteration + } + + test("full lifecycle (persist, claim, update, purge) releases every borrowed connection") { + val now = clock.instant() + val entry = OutboxEntry( + outboxId = OutboxId.new(), + messageType = "test.event", + payload = """{"k":"v"}""", + deliveryType = "stub", + status = OutboxStatus.PENDING, + createdAt = now, + updatedAt = now, + retries = 0, + lastAttempt = null, + lastError = null, + deliveryMetadata = """{"stub":true}""", + ) + + store.persist(entry) + val claimed = store.claimPending(10) + claimed.size shouldBe 1 + claimed.first().outboxId shouldBe entry.outboxId + + store.updateAfterProcessing( + claimed.first().copy(status = OutboxStatus.DELIVERED, lastAttempt = clock.instant()), + ) + store.countByStatuses()[OutboxStatus.DELIVERED] shouldBe 1L + + val removed = store.removeDeliveredBefore(clock.instant().plusSeconds(3600), 10) + removed shouldBe 1 + store.countByStatuses()[OutboxStatus.DELIVERED] shouldBe 0L + + counter.opened.get() shouldBe counter.closed.get() + } +}) + +private fun runLiquibase(container: PostgreSQLContainer) { + DriverManager.getConnection(container.jdbcUrl, container.username, container.password).use { connection -> + val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(connection)) + Liquibase("com/softwaremill/okapi/db/changelog.xml", ClassLoaderResourceAccessor(), db).use { it.update("") } + } +} diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MysqlConnectionLeakProofTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MysqlConnectionLeakProofTest.kt new file mode 100644 index 0000000..bc1eb40 --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MysqlConnectionLeakProofTest.kt @@ -0,0 +1,109 @@ +package com.softwaremill.okapi.test.transaction + +import com.mysql.cj.jdbc.MysqlDataSource +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxId +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.mysql.MysqlOutboxStore +import com.softwaremill.okapi.springboot.SpringConnectionProvider +import com.softwaremill.okapi.test.support.CountingDataSource +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import liquibase.Liquibase +import liquibase.database.DatabaseFactory +import liquibase.database.jvm.JdbcConnection +import liquibase.resource.ClassLoaderResourceAccessor +import org.testcontainers.containers.MySQLContainer +import java.sql.DriverManager +import java.time.Clock + +/** + * MySQL mirror of [ConnectionLeakProofTest]: proves that [SpringConnectionProvider] plus + * [MysqlOutboxStore] release every borrowed connection when called outside a Spring + * transaction. MySQL Connector/J has different pool-return semantics than pgjdbc; covering + * both drivers guards against a driver-specific regression. + */ +class MysqlConnectionLeakProofTest : FunSpec({ + + val container = MySQLContainer("mysql:8.0") + lateinit var counter: CountingDataSource + lateinit var store: MysqlOutboxStore + val clock: Clock = Clock.systemUTC() + + beforeSpec { + container.start() + val raw = MysqlDataSource().apply { + setURL(container.jdbcUrl) + user = container.username + setPassword(container.password) + } + runLiquibase(container) + counter = CountingDataSource(raw) + store = MysqlOutboxStore(SpringConnectionProvider(counter), clock) + } + + afterSpec { + container.stop() + } + + beforeEach { + counter.delegate.connection.use { conn -> + conn.createStatement().use { it.execute("DELETE FROM outbox") } + } + counter.opened.set(0) + counter.closed.set(0) + } + + test("read-only store methods release every borrowed connection") { + val iterations = 25 + val methodsPerIteration = 3 + repeat(iterations) { + store.countByStatuses() + store.findOldestCreatedAt(setOf(OutboxStatus.PENDING, OutboxStatus.DELIVERED)) + store.claimPending(10) + } + + counter.opened.get() shouldBe counter.closed.get() + counter.opened.get() shouldBe iterations * methodsPerIteration + } + + test("full lifecycle (persist, claim, update, purge) releases every borrowed connection") { + val now = clock.instant() + val entry = OutboxEntry( + outboxId = OutboxId.new(), + messageType = "test.event", + payload = """{"k":"v"}""", + deliveryType = "stub", + status = OutboxStatus.PENDING, + createdAt = now, + updatedAt = now, + retries = 0, + lastAttempt = null, + lastError = null, + deliveryMetadata = """{"stub":true}""", + ) + + store.persist(entry) + val claimed = store.claimPending(10) + claimed.size shouldBe 1 + claimed.first().outboxId shouldBe entry.outboxId + + store.updateAfterProcessing( + claimed.first().copy(status = OutboxStatus.DELIVERED, lastAttempt = clock.instant()), + ) + store.countByStatuses()[OutboxStatus.DELIVERED] shouldBe 1L + + val removed = store.removeDeliveredBefore(clock.instant().plusSeconds(3600), 10) + removed shouldBe 1 + store.countByStatuses()[OutboxStatus.DELIVERED] shouldBe 0L + + counter.opened.get() shouldBe counter.closed.get() + } +}) + +private fun runLiquibase(container: MySQLContainer) { + DriverManager.getConnection(container.jdbcUrl, container.username, container.password).use { connection -> + val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(connection)) + Liquibase("com/softwaremill/okapi/db/mysql/changelog.xml", ClassLoaderResourceAccessor(), db).use { it.update("") } + } +} diff --git a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt index fd2e824..14ab76c 100644 --- a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt +++ b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt @@ -30,26 +30,28 @@ class MysqlOutboxStore( delivery_metadata = VALUES(delivery_metadata) """.trimIndent() - connectionProvider.getConnection().prepareStatement(sql).use { stmt -> - stmt.setString(1, entry.outboxId.raw.toString()) - stmt.setString(2, entry.messageType) - stmt.setString(3, entry.payload) - stmt.setString(4, entry.deliveryType) - stmt.setString(5, entry.status.name) - stmt.setTimestamp(6, Timestamp.from(entry.createdAt)) - stmt.setTimestamp(7, Timestamp.from(entry.updatedAt)) - stmt.setInt(8, entry.retries) - if (entry.lastAttempt != null) { - stmt.setTimestamp( - 9, - Timestamp.from(entry.lastAttempt), - ) - } else { - stmt.setNull(9, java.sql.Types.TIMESTAMP) + connectionProvider.withConnection { conn -> + conn.prepareStatement(sql).use { stmt -> + stmt.setString(1, entry.outboxId.raw.toString()) + stmt.setString(2, entry.messageType) + stmt.setString(3, entry.payload) + stmt.setString(4, entry.deliveryType) + stmt.setString(5, entry.status.name) + stmt.setTimestamp(6, Timestamp.from(entry.createdAt)) + stmt.setTimestamp(7, Timestamp.from(entry.updatedAt)) + stmt.setInt(8, entry.retries) + if (entry.lastAttempt != null) { + stmt.setTimestamp( + 9, + Timestamp.from(entry.lastAttempt), + ) + } else { + stmt.setNull(9, java.sql.Types.TIMESTAMP) + } + if (entry.lastError != null) stmt.setString(10, entry.lastError) else stmt.setNull(10, java.sql.Types.VARCHAR) + stmt.setString(11, entry.deliveryMetadata) + stmt.executeUpdate() } - if (entry.lastError != null) stmt.setString(10, entry.lastError) else stmt.setNull(10, java.sql.Types.VARCHAR) - stmt.setString(11, entry.deliveryMetadata) - stmt.executeUpdate() } return entry } @@ -67,11 +69,13 @@ class MysqlOutboxStore( FOR UPDATE SKIP LOCKED """.trimIndent() - return connectionProvider.getConnection().prepareStatement(sql).use { stmt -> - stmt.setString(1, OutboxStatus.PENDING.name) - stmt.setInt(2, limit) - stmt.executeQuery().use { rs -> - generateSequence { if (rs.next()) rs.toOutboxEntry() else null }.toList() + return connectionProvider.withConnection { conn -> + conn.prepareStatement(sql).use { stmt -> + stmt.setString(1, OutboxStatus.PENDING.name) + stmt.setInt(2, limit) + stmt.executeQuery().use { rs -> + generateSequence { if (rs.next()) rs.toOutboxEntry() else null }.toList() + } } } } @@ -92,11 +96,13 @@ class MysqlOutboxStore( ) """.trimIndent() - return connectionProvider.getConnection().prepareStatement(sql).use { stmt -> - stmt.setString(1, OutboxStatus.DELIVERED.name) - stmt.setTimestamp(2, Timestamp.from(time)) - stmt.setInt(3, limit) - stmt.executeUpdate() + return connectionProvider.withConnection { conn -> + conn.prepareStatement(sql).use { stmt -> + stmt.setString(1, OutboxStatus.DELIVERED.name) + stmt.setTimestamp(2, Timestamp.from(time)) + stmt.setInt(3, limit) + stmt.executeUpdate() + } } } @@ -105,12 +111,14 @@ class MysqlOutboxStore( val placeholders = statuses.joinToString(",") { "?" } val sql = "SELECT status, MIN(created_at) AS min_created_at FROM outbox WHERE status IN ($placeholders) GROUP BY status" - connectionProvider.getConnection().prepareStatement(sql).use { stmt -> - statuses.forEachIndexed { i, s -> stmt.setString(i + 1, s.name) } - stmt.executeQuery().use { rs -> - while (rs.next()) { - val s = OutboxStatus.from(rs.getString("status")) - result[s] = rs.getTimestamp("min_created_at").toInstant() + connectionProvider.withConnection { conn -> + conn.prepareStatement(sql).use { stmt -> + statuses.forEachIndexed { i, s -> stmt.setString(i + 1, s.name) } + stmt.executeQuery().use { rs -> + while (rs.next()) { + val s = OutboxStatus.from(rs.getString("status")) + result[s] = rs.getTimestamp("min_created_at").toInstant() + } } } } @@ -121,10 +129,12 @@ class MysqlOutboxStore( val sql = "SELECT status, COUNT(*) AS count FROM outbox GROUP BY status" val counts = mutableMapOf() - connectionProvider.getConnection().prepareStatement(sql).use { stmt -> - stmt.executeQuery().use { rs -> - while (rs.next()) { - counts[OutboxStatus.from(rs.getString("status"))] = rs.getLong("count") + connectionProvider.withConnection { conn -> + conn.prepareStatement(sql).use { stmt -> + stmt.executeQuery().use { rs -> + while (rs.next()) { + counts[OutboxStatus.from(rs.getString("status"))] = rs.getLong("count") + } } } } diff --git a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt index 40d7ad2..5890cd5 100644 --- a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt +++ b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt @@ -30,26 +30,28 @@ class PostgresOutboxStore( delivery_metadata = EXCLUDED.delivery_metadata """.trimIndent() - connectionProvider.getConnection().prepareStatement(sql).use { stmt -> - stmt.setString(1, entry.outboxId.raw.toString()) - stmt.setString(2, entry.messageType) - stmt.setString(3, entry.payload) - stmt.setString(4, entry.deliveryType) - stmt.setString(5, entry.status.name) - stmt.setTimestamp(6, Timestamp.from(entry.createdAt)) - stmt.setTimestamp(7, Timestamp.from(entry.updatedAt)) - stmt.setInt(8, entry.retries) - if (entry.lastAttempt != null) { - stmt.setTimestamp( - 9, - Timestamp.from(entry.lastAttempt), - ) - } else { - stmt.setNull(9, java.sql.Types.TIMESTAMP) + connectionProvider.withConnection { conn -> + conn.prepareStatement(sql).use { stmt -> + stmt.setString(1, entry.outboxId.raw.toString()) + stmt.setString(2, entry.messageType) + stmt.setString(3, entry.payload) + stmt.setString(4, entry.deliveryType) + stmt.setString(5, entry.status.name) + stmt.setTimestamp(6, Timestamp.from(entry.createdAt)) + stmt.setTimestamp(7, Timestamp.from(entry.updatedAt)) + stmt.setInt(8, entry.retries) + if (entry.lastAttempt != null) { + stmt.setTimestamp( + 9, + Timestamp.from(entry.lastAttempt), + ) + } else { + stmt.setNull(9, java.sql.Types.TIMESTAMP) + } + if (entry.lastError != null) stmt.setString(10, entry.lastError) else stmt.setNull(10, java.sql.Types.VARCHAR) + stmt.setString(11, entry.deliveryMetadata) + stmt.executeUpdate() } - if (entry.lastError != null) stmt.setString(10, entry.lastError) else stmt.setNull(10, java.sql.Types.VARCHAR) - stmt.setString(11, entry.deliveryMetadata) - stmt.executeUpdate() } return entry } @@ -63,11 +65,13 @@ class PostgresOutboxStore( FOR UPDATE SKIP LOCKED """.trimIndent() - return connectionProvider.getConnection().prepareStatement(sql).use { stmt -> - stmt.setString(1, OutboxStatus.PENDING.name) - stmt.setInt(2, limit) - stmt.executeQuery().use { rs -> - generateSequence { if (rs.next()) rs.toOutboxEntry() else null }.toList() + return connectionProvider.withConnection { conn -> + conn.prepareStatement(sql).use { stmt -> + stmt.setString(1, OutboxStatus.PENDING.name) + stmt.setInt(2, limit) + stmt.executeQuery().use { rs -> + generateSequence { if (rs.next()) rs.toOutboxEntry() else null }.toList() + } } } } @@ -86,11 +90,13 @@ class PostgresOutboxStore( ) """.trimIndent() - return connectionProvider.getConnection().prepareStatement(sql).use { stmt -> - stmt.setString(1, OutboxStatus.DELIVERED.name) - stmt.setTimestamp(2, Timestamp.from(time)) - stmt.setInt(3, limit) - stmt.executeUpdate() + return connectionProvider.withConnection { conn -> + conn.prepareStatement(sql).use { stmt -> + stmt.setString(1, OutboxStatus.DELIVERED.name) + stmt.setTimestamp(2, Timestamp.from(time)) + stmt.setInt(3, limit) + stmt.executeUpdate() + } } } @@ -99,12 +105,14 @@ class PostgresOutboxStore( val placeholders = statuses.joinToString(",") { "?" } val sql = "SELECT status, MIN(created_at) AS min_created_at FROM outbox WHERE status IN ($placeholders) GROUP BY status" - connectionProvider.getConnection().prepareStatement(sql).use { stmt -> - statuses.forEachIndexed { i, s -> stmt.setString(i + 1, s.name) } - stmt.executeQuery().use { rs -> - while (rs.next()) { - val s = OutboxStatus.from(rs.getString("status")) - result[s] = rs.getTimestamp("min_created_at").toInstant() + connectionProvider.withConnection { conn -> + conn.prepareStatement(sql).use { stmt -> + statuses.forEachIndexed { i, s -> stmt.setString(i + 1, s.name) } + stmt.executeQuery().use { rs -> + while (rs.next()) { + val s = OutboxStatus.from(rs.getString("status")) + result[s] = rs.getTimestamp("min_created_at").toInstant() + } } } } @@ -115,10 +123,12 @@ class PostgresOutboxStore( val sql = "SELECT status, COUNT(*) AS count FROM outbox GROUP BY status" val counts = mutableMapOf() - connectionProvider.getConnection().prepareStatement(sql).use { stmt -> - stmt.executeQuery().use { rs -> - while (rs.next()) { - counts[OutboxStatus.from(rs.getString("status"))] = rs.getLong("count") + connectionProvider.withConnection { conn -> + conn.prepareStatement(sql).use { stmt -> + stmt.executeQuery().use { rs -> + while (rs.next()) { + counts[OutboxStatus.from(rs.getString("status"))] = rs.getLong("count") + } } } } diff --git a/okapi-spring-boot/build.gradle.kts b/okapi-spring-boot/build.gradle.kts index 2050d4b..7404539 100644 --- a/okapi-spring-boot/build.gradle.kts +++ b/okapi-spring-boot/build.gradle.kts @@ -10,7 +10,7 @@ dependencies { compileOnly(libs.springContext) compileOnly(libs.springTx) - implementation(libs.springJdbc) + compileOnly(libs.springJdbc) compileOnly(libs.springBootAutoconfigure) // Validation annotations for @ConfigurationProperties classes @@ -25,6 +25,7 @@ dependencies { testImplementation(libs.kotestRunnerJunit5) testImplementation(libs.kotestAssertionsCore) + testImplementation(libs.h2) testImplementation(libs.springContext) testImplementation(libs.springTx) testImplementation(libs.springJdbc) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index 4cdb2e8..b5a9d22 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -42,7 +42,10 @@ import javax.sql.DataSource * If both are present, Postgres takes priority. Override by defining your own `@Bean OutboxStore`. * - [Clock] — defaults to [Clock.systemUTC] * - [RetryPolicy] — defaults to `maxRetries = 5` - * - [PlatformTransactionManager] — if absent, each store call runs in its own transaction + * - [PlatformTransactionManager] — when present, scheduler/purger wrap each tick in a Spring + * transaction. When absent, store calls run in JDBC auto-commit mode, which narrows + * `FOR UPDATE SKIP LOCKED` to the claim itself and allows duplicate delivery across + * processor instances; configure one for any multi-instance deployment. * * Multi-datasource support: * - Set `okapi.datasource-qualifier` to the bean name of the [DataSource] that holds the outbox table. diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/SpringConnectionProvider.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/SpringConnectionProvider.kt index 6e921dc..5563771 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/SpringConnectionProvider.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/SpringConnectionProvider.kt @@ -6,13 +6,20 @@ import java.sql.Connection import javax.sql.DataSource /** - * Spring-aware [ConnectionProvider] that retrieves the JDBC connection - * bound to the current Spring-managed transaction. + * Spring-aware [ConnectionProvider] backed by [DataSourceUtils]. * - * Uses [DataSourceUtils.getConnection] — the standard Spring mechanism - * that works transparently with any [org.springframework.transaction.PlatformTransactionManager]: - * JPA, JDBC, jOOQ, MyBatis, Exposed, etc. + * [DataSourceUtils.releaseConnection] is a no-op when the connection is bound to an active + * Spring transaction (Spring owns its lifecycle), and returns the connection to the pool + * otherwise. Without the release, each call outside a Spring transaction would leak a + * pooled connection. */ class SpringConnectionProvider(private val dataSource: DataSource) : ConnectionProvider { - override fun getConnection(): Connection = DataSourceUtils.getConnection(dataSource) + override fun withConnection(block: (Connection) -> T): T { + val connection = DataSourceUtils.getConnection(dataSource) + return try { + block(connection) + } finally { + DataSourceUtils.releaseConnection(connection, dataSource) + } + } } diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxMysqlEndToEndTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxMysqlEndToEndTest.kt index e7ae83c..93b61fc 100644 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxMysqlEndToEndTest.kt +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxMysqlEndToEndTest.kt @@ -72,7 +72,9 @@ class OutboxMysqlEndToEndTest : beforeEach { wiremock.resetAll() - jdbc.withTransaction { jdbc.getConnection().createStatement().use { it.execute("DELETE FROM outbox") } } + jdbc.withTransaction { + jdbc.withConnection { conn -> conn.createStatement().use { it.execute("DELETE FROM outbox") } } + } } given("a message published within a transaction") { @@ -199,8 +201,11 @@ class OutboxMysqlEndToEndTest : private class TestJdbcConnectionProvider(private val dataSource: javax.sql.DataSource) : ConnectionProvider { private val threadLocalConnection = ThreadLocal() - override fun getConnection(): java.sql.Connection = threadLocalConnection.get() - ?: throw IllegalStateException("No connection bound to current thread. Use withTransaction { } in tests.") + override fun withConnection(block: (java.sql.Connection) -> T): T { + val connection = threadLocalConnection.get() + ?: throw IllegalStateException("No connection bound to current thread. Use withTransaction { } in tests.") + return block(connection) + } fun withTransaction(block: () -> T): T { val conn = dataSource.connection diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/SpringConnectionProviderTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/SpringConnectionProviderTest.kt new file mode 100644 index 0000000..0f02444 --- /dev/null +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/SpringConnectionProviderTest.kt @@ -0,0 +1,106 @@ +package com.softwaremill.okapi.springboot + +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import org.h2.jdbcx.JdbcDataSource +import org.springframework.jdbc.datasource.ConnectionHolder +import org.springframework.transaction.support.TransactionSynchronizationManager +import java.sql.Connection +import java.util.concurrent.atomic.AtomicInteger +import javax.sql.DataSource + +class SpringConnectionProviderTest : FunSpec({ + + val h2 = JdbcDataSource().apply { + setURL("jdbc:h2:mem:okapi-spring-conn-provider;DB_CLOSE_DELAY=-1") + user = "sa" + password = "" + } + + afterEach { + if (TransactionSynchronizationManager.isSynchronizationActive()) { + TransactionSynchronizationManager.clearSynchronization() + } + } + + test("releases a pool-borrowed connection when called outside a Spring transaction") { + val counter = CountingDataSource(h2) + val provider = SpringConnectionProvider(counter) + + provider.withConnection { /* no-op */ } + + counter.opened.get() shouldBe 1 + counter.closed.get() shouldBe 1 + } + + test("never leaks when called repeatedly outside a Spring transaction") { + val counter = CountingDataSource(h2) + val provider = SpringConnectionProvider(counter) + + repeat(50) { provider.withConnection { /* no-op */ } } + + counter.opened.get() shouldBe 50 + counter.closed.get() shouldBe 50 + } + + test("still releases the connection when the block throws") { + val counter = CountingDataSource(h2) + val provider = SpringConnectionProvider(counter) + + runCatching { provider.withConnection { error("boom") } } + + counter.opened.get() shouldBe 1 + counter.closed.get() shouldBe 1 + } + + test("reuses the transaction-bound connection and does not close it") { + val counter = CountingDataSource(h2) + val bound = counter.connection + val openedBefore = counter.opened.get() + val closedBefore = counter.closed.get() + + val holder = ConnectionHolder(bound).apply { requested() } + TransactionSynchronizationManager.initSynchronization() + TransactionSynchronizationManager.bindResource(counter, holder) + try { + val provider = SpringConnectionProvider(counter) + var passed: Connection? = null + provider.withConnection { passed = it } + + passed shouldBe bound + counter.opened.get() shouldBe openedBefore + counter.closed.get() shouldBe closedBefore + bound.isClosed shouldBe false + } finally { + TransactionSynchronizationManager.unbindResource(counter) + bound.close() + } + } +}) + +private class CountingDataSource(private val delegate: DataSource) : DataSource by delegate { + val opened = AtomicInteger(0) + val closed = AtomicInteger(0) + + override fun getConnection(): Connection { + val connection = delegate.connection + try { + val wrapped = CountingConnection(connection, closed) + opened.incrementAndGet() + return wrapped + } catch (t: Throwable) { + connection.close() + throw t + } + } +} + +private class CountingConnection( + private val delegate: Connection, + private val closed: AtomicInteger, +) : Connection by delegate { + override fun close() { + delegate.close() + closed.incrementAndGet() + } +}