diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go index a4e177a9f14..bc3088bbb44 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go @@ -26,7 +26,6 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" - vttablet "vitess.io/vitess/go/vt/vttablet/common" ) // isBitSet returns true if the bit at index is set @@ -47,13 +46,11 @@ func setBit(data []byte, index int, value bool) { } func (tp *TablePlan) isPartial(rowChange *binlogdatapb.RowChange) bool { - if (tp.WorkflowConfig.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage) == 0 || - rowChange.DataColumns == nil || - rowChange.DataColumns.Count == 0 { - + if rowChange == nil { return false } - return true + return (rowChange.DataColumns != nil && rowChange.DataColumns.Count > 0) || + (rowChange.JsonPartialValues != nil && rowChange.JsonPartialValues.Count > 0) } func (tpb *tablePlanBuilder) generatePartialValuesPart(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter, dataColumns *binlogdatapb.RowChange_Bitmap) *sqlparser.ParsedQuery { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index fe96e47281c..1267ad5e20c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -1560,8 +1560,24 @@ func TestPlayerPartialImages(t *testing.T) { error string } - testCases := []testCase{ - { + var testCases []testCase + + if vttablet.DefaultVReplicationConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching == 0 { + testCases = append(testCases, testCase{ + input: "insert into src (id, jd, bd) values (1,'{\"key1\": \"val1\"}','blob data'), (2,'{\"key2\": \"val2\"}','blob data2'), (3,'{\"key3\": \"val3\"}','blob data3')", + output: []string{ + "insert into dst(id,jd,bd) values (1,JSON_OBJECT(_utf8mb4'key1', _utf8mb4'val1'),_binary'blob data')", + "insert into dst(id,jd,bd) values (2,JSON_OBJECT(_utf8mb4'key2', _utf8mb4'val2'),_binary'blob data2')", + "insert into dst(id,jd,bd) values (3,JSON_OBJECT(_utf8mb4'key3', _utf8mb4'val3'),_binary'blob data3')", + }, + data: [][]string{ + {"1", "{\"key1\": \"val1\"}", "blob data"}, + {"2", "{\"key2\": \"val2\"}", "blob data2"}, + {"3", "{\"key3\": \"val3\"}", "blob data3"}, + }, + }) + } else { + testCases = append(testCases, testCase{ input: "insert into src (id, jd, bd) values (1,'{\"key1\": \"val1\"}','blob data'), (2,'{\"key2\": \"val2\"}','blob data2'), (3,'{\"key3\": \"val3\"}','blob data3')", output: []string{ "insert into dst(id,jd,bd) values (1,JSON_OBJECT(_utf8mb4'key1', _utf8mb4'val1'),_binary'blob data'), (2,JSON_OBJECT(_utf8mb4'key2', _utf8mb4'val2'),_binary'blob data2'), (3,JSON_OBJECT(_utf8mb4'key3', _utf8mb4'val3'),_binary'blob data3')", @@ -1571,7 +1587,7 @@ func TestPlayerPartialImages(t *testing.T) { {"2", "{\"key2\": \"val2\"}", "blob data2"}, {"3", "{\"key3\": \"val3\"}", "blob data3"}, }, - }, + }) } if runNoBlobTest { testCases = append(testCases, testCase{ @@ -1762,11 +1778,21 @@ func TestPlayerPartialImages(t *testing.T) { execStatements(t, []string{tc.input}) var want qh.ExpectationSequencer if tc.error != "" { - want = qh.Expect( - "rollback", - ).Then(qh.Immediately( - fmt.Sprintf("/update _vt.vreplication set message=.*%s.*", tc.error), - )) + if vttablet.DefaultVReplicationConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching == 0 { + want = qh.Expect( + "begin", + "delete from dst where id=3", + "rollback", + ).Then(qh.Immediately( + fmt.Sprintf("/update _vt.vreplication set message=.*%s.*", tc.error), + )) + } else { + want = qh.Expect( + "rollback", + ).Then(qh.Immediately( + fmt.Sprintf("/update _vt.vreplication set message=.*%s.*", tc.error), + )) + } expectDBClientQueries(t, want) } else { want = qh.Expect( diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 1cedc01dbf1..fb4cb324047 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -1031,8 +1031,8 @@ func (vs *vstreamer) processRowEvent(vevents []*binlogdatapb.VEvent, plan *strea } if afterOK { rowChange.After = sqltypes.RowToProto3(afterValues) - if (vs.config.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage != 0) && - (partial || row.JSONPartialValues.Count() > 0) { + if ((vs.config.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage != 0) && partial) || + (row.JSONPartialValues.Count() > 0) { rowChange.DataColumns = &binlogdatapb.RowChange_Bitmap{ Count: int64(rows.DataColumns.Count()),