Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
version: "2"
run:
tests: true
linters:
enable:
- errcheck
- gosimple
- govet
- ineffassign
- staticcheck
- unused
- gofmt
- goimports
- misspell
- unconvert
- unparam
- whitespace

linters-settings:
errcheck:
check-type-assertions: true
check-blank: true

govet:
check-shadowing: true

run:
timeout: 5m
tests: true

settings:
errcheck:
check-type-assertions: true
check-blank: true
govet:
enable:
- shadow
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$
issues:
exclude-use-default: false
max-issues-per-linter: 0
max-same-issues: 0
formatters:
enable:
- gofmt
- goimports
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$
5 changes: 5 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ tasks:
cmds:
- golangci-lint run ./...

lint-fix:
desc: Run linter
cmds:
- golangci-lint run ./... --fix

fmt:
desc: Format code
cmds:
Expand Down
102 changes: 101 additions & 1 deletion cmd/smallflow/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,107 @@
package main

import "fmt"
import (
"context"
"fmt"
orchestrator2 "github.com/morebec/smallflow/internal/application/orchestrator"
"github.com/morebec/smallflow/internal/business/workflowmgmt"
"github.com/morebec/smallflow/internal/integration/postgres"
"time"

"github.com/morebec/go-misas/misas"
"github.com/morebec/go-misas/mpostgres"
"github.com/morebec/go-misas/muuid"
"github.com/morebec/go-misas/mx"
)

func main() {
fmt.Println("Build, run, and observe workflows without the overhead!")

clock := mx.NewRealTimeClock(time.UTC)

dbConn, err := mpostgres.OpenConn("postgres://smallflow:smallflow@localhost:5432/postgres?sslmode=disable")
Copy link

Copilot AI Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connection string targets the 'postgres' database, but compose config sets POSTGRES_DB=smallflow. Align the DSN to use the 'smallflow' database to avoid auth/permission mismatches (e.g., postgres://smallflow:smallflow@localhost:5432/smallflow?sslmode=disable).

Suggested change
dbConn, err := mpostgres.OpenConn("postgres://smallflow:smallflow@localhost:5432/postgres?sslmode=disable")
dbConn, err := mpostgres.OpenConn("postgres://smallflow:smallflow@localhost:5432/smallflow?sslmode=disable")

Copilot uses AI. Check for mistakes.
if err != nil {
panic(err)
}

var eventStore misas.EventStore
eventStore, err = mpostgres.NewEventStore(clock, dbConn)
if err != nil {
panic(err)
}

mx.EventRegistry.Register(workflowmgmt.WorkflowEnabledEventTypeName, workflowmgmt.WorkflowEnabledEvent{})
mx.EventRegistry.Register(workflowmgmt.WorkflowDisabledEventTypeName, workflowmgmt.WorkflowDisabledEvent{})
mx.EventRegistry.Register(workflowmgmt.WorkflowTriggeredEventTypeName, workflowmgmt.WorkflowTriggeredEvent{})
mx.EventRegistry.Register(workflowmgmt.WorkflowStartedEventTypeName, workflowmgmt.WorkflowStartedEvent{})
mx.EventRegistry.Register(workflowmgmt.WorkflowEndedEventTypeName, workflowmgmt.WorkflowEndedEvent{})
mx.EventRegistry.Register(workflowmgmt.StepStartedEventTypeName, workflowmgmt.StepStartedEvent{})
mx.EventRegistry.Register(workflowmgmt.StepEndedEventTypeName, workflowmgmt.StepEndedEvent{})

eventStore = mx.NewEventStoreDeserializerDecorator(eventStore)

workflowRepo := &postgres.EventStoreWorkflowRepository{
EventStore: eventStore,
UUIDGenerator: muuid.NewRandomUUIDGenerator(),
}
runRepo := &postgres.EventStoreRunRepository{
EventStore: eventStore,
UUIDGenerator: muuid.NewRandomUUIDGenerator(),
}

api := workflowmgmt.NewSubsystem(clock, workflowRepo, runRepo, muuid.NewRandomUUIDGenerator()).API
workflowLeaseRepository, err := postgres.NewWorkflowLeaseRepository(dbConn)
if err != nil {
panic(err)
}

leaseManager := orchestrator2.WorkflowLeaseManager{
Clock: clock,
Repository: workflowLeaseRepository,
}

checkpointStore, err := mpostgres.NewPostgreSQLCheckpointStore(dbConn)
if err != nil {
panic(err)
}
orch := orchestrator2.NewWorkflowOrchestrator(
clock,
api,
leaseManager,
muuid.NewRandomUUIDGenerator(),
eventStore,
checkpointStore,
)
orch.Start()
defer orch.Stop()

ctx := context.Background()

fmt.Println("Enabling workflow...")
if result := api.HandleCommand(ctx, workflowmgmt.EnableWorkflowCommand{
WorkflowID: "my-workflow",
}); result.Error != nil {
panic(result.Error)
}

for i := range 1 {
fmt.Printf("Triggering workflow #%d...\n", i+1)
if result := api.HandleCommand(ctx, workflowmgmt.TriggerWorkflowCommand{
WorkflowID: "my-workflow",
RunID: muuid.NewRandomUUIDGenerator().Generate().String(),
}); result.Error != nil {
panic(result.Error)
}
}

<-time.After(30 * time.Second)
fmt.Println("Current events in the event store:")
stream, err := eventStore.ReadFromStream(ctx, eventStore.GlobalStreamID(), misas.ReadFromEventStreamOptions{}.FromStart().Forward())
if err != nil {
panic(err)
}

for i, event := range stream.Events {
fmt.Printf("Event %d: %T → %+v\n", i, event, event)
}
}
8 changes: 0 additions & 8 deletions compose.debug.yaml

This file was deleted.

26 changes: 20 additions & 6 deletions compose.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
services:
smallflow:
image: smallflow
build:
context: .
dockerfile: ./Dockerfile
# smallflow:
# image: smallflow
# build:
# context: .
# dockerfile: ./Dockerfile
# ports:
# - 3000:3000

postgres:
image: postgres:18-alpine
environment:
POSTGRES_USER: smallflow
POSTGRES_PASSWORD: smallflow
POSTGRES_DB: smallflow
ports:
- 3000:3000
- "5432:5432"
volumes:
- postgres:/var/lib/postgresql/data

volumes:
postgres:
16 changes: 16 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
module github.com/morebec/smallflow

go 1.24.8

replace github.com/morebec/go-misas => ./../go-misas-back

require (
github.com/alitto/pond/v2 v2.5.0
github.com/morebec/go-misas v0.0.0-00010101000000-000000000000
github.com/samber/lo v1.49.1
)

require (
github.com/google/uuid v1.6.0 // indirect
github.com/lib/pq v1.10.9 // indirect
go.opentelemetry.io/otel v1.35.0 // indirect
go.opentelemetry.io/otel/trace v1.35.0 // indirect
golang.org/x/text v0.28.0 // indirect
)
49 changes: 49 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/alitto/pond/v2 v2.5.0 h1:vPzS5GnvSDRhWQidmj2djHllOmjFExVFbDGCw1jdqDw=
github.com/alitto/pond/v2 v2.5.0/go.mod h1:xkjYEgQ05RSpWdfSd1nM3OVv7TBhLdy7rMp3+2Nq+yE=
github.com/cucumber/gherkin/go/v26 v26.2.0 h1:EgIjePLWiPeslwIWmNQ3XHcypPsWAHoMCz/YEBKP4GI=
github.com/cucumber/gherkin/go/v26 v26.2.0/go.mod h1:t2GAPnB8maCT4lkHL99BDCVNzCh1d7dBhCLt150Nr/0=
github.com/cucumber/godog v0.15.1 h1:rb/6oHDdvVZKS66hrhpjFQFHjthFSrQBCOI1LwshNTI=
github.com/cucumber/godog v0.15.1/go.mod h1:qju+SQDewOljHuq9NSM66s0xEhogx0q30flfxL4WUk8=
github.com/cucumber/messages/go/v21 v21.0.1 h1:wzA0LxwjlWQYZd32VTlAVDTkW6inOFmSM+RuOwHZiMI=
github.com/cucumber/messages/go/v21 v21.0.1/go.mod h1:zheH/2HS9JLVFukdrsPWoPdmUtmYQAQPLk7w5vWsk5s=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gofrs/uuid v4.3.1+incompatible h1:0/KbAdpx3UXAx1kEOWHJeOkpbgRFGHVgv+CFIY7dBJI=
github.com/gofrs/uuid v4.3.1+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc=
github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-memdb v1.3.4 h1:XSL3NR682X/cVk2IeV0d70N4DZ9ljI885xAEU8IoK3c=
github.com/hashicorp/go-memdb v1.3.4/go.mod h1:uBTr1oQbtuMgd1SSGoR8YV27eT3sBHbYiNm53bMpgSg=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/samber/lo v1.49.1 h1:4BIFyVfuQSEpluc7Fua+j1NolZHiEHEpaSEKdsH0tew=
github.com/samber/lo v1.49.1/go.mod h1:dO6KHFzUKXgP8LDhU0oI8d2hekjXnGOu0DB8Jecxd6o=
github.com/sergi/go-diff v1.4.0 h1:n/SP9D5ad1fORl+llWyN+D6qoUETXNZARKjyY2/KVCw=
github.com/sergi/go-diff v1.4.0/go.mod h1:A0bzQcvG0E7Rwjx0REVgAGH58e96+X0MeOfepqsbeW4=
github.com/spf13/pflag v1.0.7 h1:vN6T9TfwStFPFM5XzjsvmzZkLuaLX+HS+0SeFLRgU6M=
github.com/spf13/pflag v1.0.7/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yudai/gojsondiff v1.0.0 h1:27cbfqXLVEJ1o8I6v3y9lg8Ydm53EKqHXAOMxEGlCOA=
github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg=
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3IfnEUduWvb9is428/nNb5L3U01M=
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM=
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
73 changes: 73 additions & 0 deletions internal/application/orchestrator/lease.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package orchestrator

import (
"context"
"time"

"github.com/morebec/go-misas/misas"
)

const defaultLeaseDuration = 5 * time.Minute

type WorkflowLease struct {
WorkflowID string
RunID string
expiresAt time.Time
}

func (wl WorkflowLease) IsExpired(currentTime time.Time) bool { return currentTime.After(wl.expiresAt) }

type WorkflowLeaseRepository interface {
Add(context.Context, WorkflowLease) error
Update(context.Context, WorkflowLease) error
Remove(ctx context.Context, workflowID string, runID string) error
FindByWorkflowRunID(ctx context.Context, workflowID string, runID string) (*WorkflowLease, error)
}

type WorkflowLeaseManager struct {
Clock misas.Clock
Repository WorkflowLeaseRepository
}

func (l WorkflowLeaseManager) TryAcquire(ctx context.Context, workflowID string, runID string) (bool, error) {
lease, err := l.Repository.FindByWorkflowRunID(ctx, workflowID, runID)
if err != nil {
return false, err
}

if lease != nil && !lease.IsExpired(l.Clock.Now()) {
// Lease is still valid
return false, nil
}

wl := WorkflowLease{
WorkflowID: workflowID,
RunID: runID,
expiresAt: l.Clock.Now().Add(defaultLeaseDuration),
}
if err := l.Repository.Add(ctx, wl); err != nil {
return false, err
}

return true, nil
}

func (l WorkflowLeaseManager) Release(ctx context.Context, workflowID string, runID string) error {
if err := l.Repository.Remove(ctx, workflowID, runID); err != nil {
return err
}

return nil
}

func (l WorkflowLeaseManager) RenewLease(ctx context.Context, workflowID string, runID string) error {
if err := l.Repository.Update(ctx, WorkflowLease{
WorkflowID: workflowID,
RunID: runID,
expiresAt: l.Clock.Now().Add(defaultLeaseDuration),
}); err != nil {
return err
}

return nil
}
Loading