Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s String) Get(r *record.Record) (string, error) {
return resolveMacro, nil
}

return evaluateContext(resolveMacro, r)
return evaluateMeta(resolveMacro, r)

}

Expand Down Expand Up @@ -63,7 +63,7 @@ func Load(configFile string, obj interface{}) error {
"env": getEnvironmentVariable, // returns environment variable
"macro": setMacroPlaceholder, // set placeholder string for macro replacement
"secret": getSecret, // we use this template function to inject secrets from parameter store
"context": setContextPlaceholder, // set placeholder string for context replacement
"context": SetMetaPlaceholder, // set placeholder string for context replacement. Maintaining "context" as a template function name for backward compatibility
// indent: add `n` spaces after every newline in the value (useful when
// injecting multiline values into YAML block scalars)
"indent": func(n int, v string) string {
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/config/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ var (
)

// return placeholder string for context key
func setContextPlaceholder(key string) (string, error) {
func SetMetaPlaceholder(key string) (string, error) {
return fmt.Sprintf(contextPlaceholderString, key), nil
}

func evaluateContext(data string, record *record.Record) (string, error) {
func evaluateMeta(data string, record *record.Record) (string, error) {

// Find all context template patterns
matches := contextTemplateRegex.FindAllStringSubmatch(data, -1)
Expand All @@ -45,7 +45,7 @@ func evaluateContext(data string, record *record.Record) (string, error) {
key := match[1]

// Get the context value
value, ok := record.GetContextValue(key)
value, ok := record.GetMetaValue(key)
if !ok {
missingKeys = append(missingKeys, key)
continue
Expand Down
12 changes: 9 additions & 3 deletions internal/pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pipeline

import (
"context"
"fmt"
"sync"

Expand All @@ -22,6 +23,8 @@ type Pipeline struct {
wg *sync.WaitGroup
locker *sync.Mutex
errors map[string]error
ctx context.Context
cancel context.CancelFunc
}

func (p *Pipeline) Init() error {
Expand Down Expand Up @@ -61,6 +64,9 @@ func (p *Pipeline) Run() error {
p.ChannelSize = defaultChannelSize
}

// Create pipeline-level context for cancellation
p.ctx, p.cancel = context.WithCancel(context.Background())

// sync
if p.DAG == nil {
// data streams
Expand Down Expand Up @@ -237,18 +243,18 @@ func (p *Pipeline) runTaskConcurrently(t task.Task, input <-chan *record.Record,
taskWg.Add(concurrency)

for i := 0; i < concurrency; i++ {
go func(task task.Task, in <-chan *record.Record, out chan<- *record.Record) {
go func(ctx context.Context, task task.Task, in <-chan *record.Record, out chan<- *record.Record) {
defer taskWg.Done()

if err := task.Run(in, out); err != nil {
if err := task.Run(ctx, in, out); err != nil {
fmt.Printf("error in %s: %s\n", task.GetName(), err)
if task.GetFailOnError() {
p.locker.Lock()
p.errors[task.GetName()] = err
p.locker.Unlock()
}
}
}(t, input, output)
}(p.ctx, t, input, output)
}

go func(wg *sync.WaitGroup, out chan<- *record.Record) {
Expand Down
27 changes: 0 additions & 27 deletions internal/pkg/pipeline/record/context.go

This file was deleted.

16 changes: 16 additions & 0 deletions internal/pkg/pipeline/record/meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package record

func (r *Record) SetMetaValue(key string, value string) {
if r.Meta == nil {
r.Meta = make(map[string]string)
}
r.Meta[key] = value
}

func (r *Record) GetMetaValue(key string) (string, bool) {
if r.Meta == nil {
return "", false
}
v, ok := r.Meta[key]
return v, ok
}
9 changes: 4 additions & 5 deletions internal/pkg/pipeline/record/record.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package record

import (
"context"
"encoding/json"
)

type Record struct {
ID int `yaml:"id,omitempty" json:"id,omitempty"`
Origin string `yaml:"origin,omitempty" json:"origin,omitempty"`
Data []byte `yaml:"data,omitempty" json:"data,omitempty"`
Context context.Context `yaml:"-" json:"-"`
ID int `yaml:"id,omitempty" json:"id,omitempty"`
Origin string `yaml:"origin,omitempty" json:"origin,omitempty"`
Data []byte `yaml:"data,omitempty" json:"data,omitempty"`
Meta map[string]string `yaml:"meta,omitempty" json:"meta,omitempty"`
}

func (m Record) MarshalJSON() ([]byte, error) {
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/pipeline/task/archive/archive.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package archive

import (
"context"
"fmt"

"github.com/patterninc/caterpillar/internal/pkg/pipeline/record"
Expand Down Expand Up @@ -83,7 +84,7 @@ func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil
}

func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) (err error) {
func (c *core) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) (err error) {

if input == nil {
return task.ErrNilInput
Expand Down
10 changes: 5 additions & 5 deletions internal/pkg/pipeline/task/archive/tar.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func (t *tarArchive) Read() {
if _, err := io.ReadFull(r, buf); err != nil && err != io.EOF {
log.Fatal(err)
}
rc.SetContextValue(string(task.CtxKeyArchiveFileNameWrite), filepath.Base(header.Name))
t.SendData(rc.Context, buf, t.OutputChan)
rc.SetMetaValue(task.MetaKeyArchiveFileNameWrite, filepath.Base(header.Name))
t.SendData(rc.Meta, buf, t.OutputChan)
}

}
Expand All @@ -73,7 +73,7 @@ func (t *tarArchive) Write() {
continue
}

filePath, found := rec.GetContextValue(string(task.CtxKeyFileNameWrite))
filePath, found := rec.GetMetaValue(task.MetaKeyFileNameWrite)
if !found {
log.Fatal("filepath not set in context")
}
Expand All @@ -97,12 +97,12 @@ func (t *tarArchive) Write() {
log.Fatal(err)
}

rc.Context = rec.Context
rc.Meta = rec.Meta
}

if err := tw.Close(); err != nil {
log.Fatal(err)
}

t.SendData(rc.Context, buf.Bytes(), t.OutputChan)
t.SendData(rc.Meta, buf.Bytes(), t.OutputChan)
}
10 changes: 5 additions & 5 deletions internal/pkg/pipeline/task/archive/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (z *zipArchive) Read() {
// check the file type is regular file
if f.FileInfo().Mode().IsRegular() {

rc.SetContextValue(string(task.CtxKeyArchiveFileNameWrite), filepath.Base(f.Name))
rc.SetMetaValue(task.MetaKeyArchiveFileNameWrite, filepath.Base(f.Name))

fs, err := f.Open()
if err != nil {
Expand All @@ -55,7 +55,7 @@ func (z *zipArchive) Read() {

fs.Close()

z.SendData(rc.Context, buf, z.OutputChan)
z.SendData(rc.Meta, buf, z.OutputChan)
}
}
}
Expand All @@ -73,7 +73,7 @@ func (z *zipArchive) Write() {
break
}

filePath, found := rec.GetContextValue(string(task.CtxKeyFileNameWrite))
filePath, found := rec.GetMetaValue(task.MetaKeyFileNameWrite)
if !found {
log.Fatal("filepath not set in context")
}
Expand All @@ -93,14 +93,14 @@ func (z *zipArchive) Write() {
log.Fatal(err)
}

rc.Context = rec.Context
rc.Meta = rec.Meta
}

if err := zipWriter.Close(); err != nil {
log.Fatal(err)
}

// Send the complete ZIP archive
z.SendData(rc.Context, zipBuf.Bytes(), z.OutputChan)
z.SendData(rc.Meta, zipBuf.Bytes(), z.OutputChan)

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (p *parameterStore) Init() error {
return nil
}

func (p *parameterStore) Run(input <-chan *record.Record, output chan<- *record.Record) (err error) {
func (p *parameterStore) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) (err error) {

for {
r, ok := p.GetRecord(input)
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/pipeline/task/compress/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package compress

import (
"bytes"
"context"
"fmt"
"io"

Expand Down Expand Up @@ -47,7 +48,7 @@ func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil
}

func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) (err error) {
func (c *core) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) (err error) {

if input == nil {
return task.ErrNilInput
Expand Down Expand Up @@ -83,7 +84,7 @@ func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) (e
}

if output != nil {
c.SendData(r.Context, transformedData, output)
c.SendData(r.Meta, transformedData, output)
}
}

Expand Down
7 changes: 4 additions & 3 deletions internal/pkg/pipeline/task/converter/converter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package converter

import (
"context"
"fmt"

"github.com/patterninc/caterpillar/internal/pkg/pipeline/record"
Expand Down Expand Up @@ -66,7 +67,7 @@ func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error {

}

func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) error {
func (c *core) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) error {

for {
r, ok := c.GetRecord(input)
Expand All @@ -83,10 +84,10 @@ func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) er
if out.Data != nil {
// Add metadata to context
for k, v := range out.Metadata {
r.SetContextValue(k, v)
r.SetMetaValue(k, v)
}

c.SendData(r.Context, out.Data, output)
c.SendData(r.Meta, out.Data, output)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/pipeline/task/delay/delay.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package delay

import (
"context"
"time"

"github.com/patterninc/caterpillar/internal/pkg/duration"
Expand All @@ -23,7 +24,7 @@ func New() (task.Task, error) {
}, nil
}

func (d *delay) Run(input <-chan *record.Record, output chan<- *record.Record) error {
func (d *delay) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) error {

for {
r, ok := d.GetRecord(input)
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/pipeline/task/echo/echo.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package echo

import (
"context"
"fmt"
"time"

Expand All @@ -21,7 +22,7 @@ func New() (task.Task, error) {
return &echo{}, nil
}

func (e *echo) Run(input <-chan *record.Record, output chan<- *record.Record) (err error) {
func (e *echo) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) (err error) {

for {
r, ok := e.GetRecord(input)
Expand Down
12 changes: 6 additions & 6 deletions internal/pkg/pipeline/task/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func New() (task.Task, error) {
}, nil
}

func (f *file) Run(input <-chan *record.Record, output chan<- *record.Record) error {
func (f *file) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) error {

// let's check if we read file or we write file...
if input != nil && output != nil {
Expand Down Expand Up @@ -134,12 +134,12 @@ func (f *file) readFile(output chan<- *record.Record) error {
return err
}

// Create a default record with context
rc := &record.Record{Context: ctx}
rc.SetContextValue(string(task.CtxKeyFileNameWrite), filepath.Base(path))
// create a default record with file name in context
rc := &record.Record{}
rc.SetMetaValue(task.MetaKeyFileNameWrite, filepath.Base(path))

// let's write content to output channel
f.SendData(rc.Context, content, output)
f.SendData(rc.Meta, content, output)

}

Expand Down Expand Up @@ -174,7 +174,7 @@ func (f *file) writeFile(input <-chan *record.Record) error {
var fs file

fs = *f
filePath, found := rc.GetContextValue(string(task.CtxKeyArchiveFileNameWrite))
filePath, found := rc.GetMetaValue(task.MetaKeyArchiveFileNameWrite)
if found {
if filePath == "" {
log.Fatal("required file path")
Expand Down
Loading