-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
python.go
227 lines (192 loc) · 6.53 KB
/
python.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package pytasks
import (
"errors"
"runtime"
"sync"
"sync/atomic"
"github.com/DataDog/go-python3"
)
var singletonOnce sync.Once
var pythonSingletonInstance *pythonSingleton
type Tuple struct {
Result interface{}
Err error
}
// PythonSingleton is an interface to the pythonSingleton instance
type PythonSingleton interface {
ImportModule(name string) (*python3.PyObject, error)
NewTask(task func()) (*sync.WaitGroup, error)
NewTaskSync(task func()) error
Finalize() error
}
type pythonSingleton struct {
taskWG sync.WaitGroup
stopWG sync.WaitGroup
stoppedWG sync.WaitGroup
stopped int32
stopOnce sync.Once
}
type PythonSingletonOption func(ps *pythonSingleton) error
func WithModules(modules []string) PythonSingletonOption {
return func(ps *pythonSingleton) error {
for _, m := range modules {
_, err := ps.ImportModule(m)
if err != nil {
return err
}
}
return nil
}
}
// GetPythonSingleton returns the existing pythonSingleton
// or creates a new one if one has not been created yet.
func GetPythonSingleton(opts ...PythonSingletonOption) PythonSingleton {
singletonOnce.Do(func() {
ps := &pythonSingleton{}
tuple := ps.initPython(opts...)
startedWG := tuple.Result.(*sync.WaitGroup)
startedWG.Wait()
pythonSingletonInstance = ps
})
return pythonSingletonInstance
}
// This is a special case. The thread that inits python needs to also shut it down.
// This function returns a WaitGroup that should be waited on for startup to finish.
func (ps *pythonSingleton) initPython(opts ...PythonSingletonOption) *Tuple {
var startedWG sync.WaitGroup
startedWG.Add(1)
ps.stoppedWG.Add(1)
ps.stopWG.Add(1)
result := &Tuple{
Result: &startedWG,
}
go func() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
// The following will also create the GIL explicitly
// by calling PyEval_InitThreads(), without waiting
// for the interpreter to do that
python3.Py_Initialize()
if !python3.Py_IsInitialized() {
result.Err = errors.New("Error initializing the python interpreter")
return
}
// https://stackoverflow.com/questions/27844676/assertionerror-3-x-only-when-calling-py-finalize-with-threads
threadingMod := python3.PyImport_ImportModule("threading")
threadingMod.DecRef()
for _, opt := range opts {
err := opt(ps)
if err != nil {
result.Err = err
return
}
}
// Initialize() has locked the the GIL but at this point we don't need it
// anymore. We save the current state and release the lock
// so that goroutines can acquire it
state := python3.PyEval_SaveThread()
// Trigger startedWG.
startedWG.Done()
// Wait until finalize is triggered for the singleton in this thread.
ps.stopWG.Wait()
// At this point we know we won't need Python anymore in this
// program, we can restore the state and lock the GIL to perform
// the final operations before exiting.
python3.PyEval_RestoreThread(state)
python3.Py_Finalize()
ps.stoppedWG.Done()
}()
return result
}
// NewTask creates a new task that will run in python.
// This returns a WaitGroup that will release when it is ready to continue processing and use it's return value
// meaning the Python GIL has been released.
func (ps *pythonSingleton) NewTask(task func()) (*sync.WaitGroup, error) {
if atomic.LoadInt32(&ps.stopped) == 1 {
return nil, errors.New("Finalize has been called on PythonSingleton. No new operations")
}
return ps.newTask(task)
}
// newTask will start a task without a lock. This is used internally by public NewTask and ImportModule.
//
// When a goroutine starts, it’s scheduled for execution on one of the GOMAXPROCS
// threads available—see here for more details on the topic. If a goroutine happens
// to perform a syscall or call C code, the current thread hands over the other
// goroutines waiting to run in the thread queue to another thread so they can have
// better chances to run; the current goroutine is paused, waiting for the syscall or
// the C function to return. When this happens, the thread tries to resume the paused
// goroutine, but if this is not possible, it asks the Go runtime to find another
// thread to complete the goroutine and goes to sleep. The goroutine is finally
// scheduled to another thread and it finishes.
//
// 1. Our goroutine starts, performs a C call, and pauses. The GIL is locked.
// 2. When the C call returns, the current thread tries to resume the goroutine, but it fails.
// 3. The current thread tells the Go runtime to find another thread to resume our goroutine.
// 4. The Go scheduler finds an available thread and the goroutine is resumed.
// 5. The goroutine is almost done and tries to unlock the GIL before returning.
// 6. The thread ID stored in the current state is from the original thread and is different from the ID of the current thread.
// 7. Panic!
func (ps *pythonSingleton) newTask(task func()) (*sync.WaitGroup, error) {
var wg sync.WaitGroup
wg.Add(1)
ps.taskWG.Add(1)
go func() {
defer ps.taskWG.Done()
defer wg.Done()
runtime.LockOSThread()
defer runtime.UnlockOSThread()
_gstate := python3.PyGILState_Ensure()
defer python3.PyGILState_Release(_gstate)
// save the response from the task
task()
}()
return &wg, nil
}
// Finalize trigger the thread that started python to stop it.
// It will block until Python has stopped.
func (ps *pythonSingleton) Finalize() error {
// Wait for any tasks to finish
ps.taskWG.Wait()
if atomic.SwapInt32(&ps.stopped, 1) == 1 {
return errors.New("Finalize already called on PythonSingleton")
}
ps.stopOnce.Do(func() {
go func() {
ps.stopWG.Done()
}()
ps.stoppedWG.Wait()
})
return nil
}
// ImportModule will import the python module and add it
// to the registry or return an already imported module.
func (ps *pythonSingleton) ImportModule(name string) (*python3.PyObject, error) {
if atomic.LoadInt32(&ps.stopped) == 1 {
return nil, errors.New("Finalize has been called on PythonSingleton. No new operations")
}
var module *python3.PyObject
var err error
taskWG, taskErr := ps.newTask(func() {
// module = python3.PyImport_ImportModule(name)
pyName := python3.PyUnicode_FromString(name)
module = python3.PyImport_Import(pyName)
if module == nil {
python3.PyErr_PrintEx(false)
err = errors.New("error importing module")
}
})
if taskErr != nil {
return nil, taskErr
}
taskWG.Wait()
return module, err
}
// NewTaskSync calls NewTask and waits for the task to finish before returning.
func (ps *pythonSingleton) NewTaskSync(task func()) error {
taskWG, taskErr := ps.newTask(task)
if taskErr != nil {
return taskErr
}
taskWG.Wait()
return nil
}