-
Notifications
You must be signed in to change notification settings - Fork 38
/
writer.go
133 lines (123 loc) · 2.67 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package afs
import (
"context"
"github.com/viant/afs/option"
"github.com/viant/afs/storage"
"io"
"os"
"sync"
"sync/atomic"
)
//NewWriter creates an upload writer
func (s *service) NewWriter(ctx context.Context, URL string, mode os.FileMode, options ...storage.Option) (io.WriteCloser, error) {
empty := &option.Empty{}
option.Assign(options, &empty)
manager, err := s.manager(ctx, URL, options)
if err != nil {
return nil, err
}
if provider, ok := manager.(storage.WriterProvider); ok {
return provider.NewWriter(ctx, URL, mode, options...)
}
return &writer{
ctx: ctx,
url: URL,
allowEmpty: empty.Allowed,
mode: mode,
options: options,
uploader: s,
opened: false,
doneChannel: make(chan bool),
err: nil,
}, nil
}
// A writer writes an object to destination
type writer struct {
ctx context.Context
url string
mode os.FileMode
options []storage.Option
uploader storage.Uploader
mutex sync.RWMutex
opened bool
allowEmpty bool
writer *io.PipeWriter
doneChannel chan bool
err error
written int64
}
func (w *writer) open() error {
pipeReader, pipeWriter := io.Pipe()
w.writer = pipeWriter
w.opened = true
go w.monitorCancel()
go func() {
defer close(w.doneChannel)
if err := w.uploader.Upload(w.ctx, w.url, w.mode, pipeReader, w.options...); err != nil {
w.setError(err)
pipeReader.CloseWithError(err)
return
}
}()
return nil
}
// Write appends to pipe writer
func (w *writer) Write(p []byte) (n int, err error) {
if err := w.error(); err != nil {
return 0, err
}
if !w.opened {
if err := w.open(); err != nil {
return 0, err
}
}
n, err = w.writer.Write(p)
atomic.AddInt64(&w.written, int64(n))
if err != nil {
w.setError(err)
if err == context.Canceled || err == context.DeadlineExceeded {
return n, err
}
}
return n, err
}
// Close completes the write operation and flushes any buffered data.
func (w *writer) Close() error {
//nothing was written quit
if atomic.LoadInt64(&w.written) == 0 && !w.allowEmpty {
defer close(w.doneChannel)
return nil
}
if !w.opened {
if err := w.open(); err != nil {
return err
}
}
// Closing either the read or write causes the entire pipe to close.
if err := w.writer.Close(); err != nil {
return err
}
<-w.doneChannel
return w.err
}
func (w *writer) monitorCancel() {
select {
case <-w.ctx.Done():
w.setError(w.ctx.Err())
case <-w.doneChannel:
}
}
func (w *writer) error() error {
w.mutex.RLock()
result := w.err
w.mutex.RUnlock()
return result
}
func (w *writer) setError(err error) {
if err == nil {
return
}
w.mutex.Lock()
w.err = err
w.mutex.Unlock()
}