Skip to content

Commit

Permalink
chore: some MORE nit fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Jun 11, 2024
1 parent 81cd311 commit 079d627
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@

import java.io.IOException;

public class AvroIcebergConverter {
public class AvroIcebergRecordConverter {
private final org.apache.iceberg.Schema icebergSchema;
private final Schema icebergAvroSchema;
private final DataReader<org.apache.iceberg.data.GenericRecord> dataReader;

public AvroIcebergConverter(String avroSchemaString, org.apache.iceberg.Schema icebergSchema, String tableName) {
public AvroIcebergRecordConverter(String avroSchemaString, org.apache.iceberg.Schema icebergSchema, String tableName) {
this(new Schema.Parser().parse(avroSchemaString), icebergSchema, tableName);

}

public AvroIcebergConverter(Schema sourceAvroSchema, org.apache.iceberg.Schema icebergSchema, String tableName) {
public AvroIcebergRecordConverter(Schema sourceAvroSchema, org.apache.iceberg.Schema icebergSchema, String tableName) {
this.icebergSchema = icebergSchema;
this.icebergAvroSchema = AvroSchemaUtil.convert(icebergSchema, tableName);
this.dataReader = DataReader.create(icebergSchema, icebergAvroSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.Streams;
import io.peerdb.flow.jvm.grpc.*;
import io.peerdb.flow.jvm.iceberg.avro.AvroIcebergConverter;
import io.peerdb.flow.jvm.iceberg.avro.AvroIcebergRecordConverter;
import io.peerdb.flow.jvm.iceberg.catalog.CatalogLoader;
import io.peerdb.flow.jvm.iceberg.lock.LockManager;
import io.peerdb.flow.jvm.iceberg.writer.RecordWriterFactory;
Expand Down Expand Up @@ -42,7 +42,7 @@ public class IcebergService {
@Inject
RecordWriterFactory recordWriterFactory;

private static void writeRecordStream(Stream<InsertRecord> recordStream, AvroIcebergConverter converter, TaskWriter<Record> writer) {
private static void writeRecordStream(Stream<InsertRecord> recordStream, AvroIcebergRecordConverter converter, TaskWriter<Record> writer) {
recordStream.parallel().map(insertRecord -> {
try {
return converter.toIcebergRecord(insertRecord.getRecord().toByteArray());
Expand Down Expand Up @@ -108,8 +108,11 @@ private boolean appendRecords(TableInfo tableInfo, String avroSchema, List<Inser
return true;
}
var recordStream = insertRecords.stream();

Log.infof("Converting append records to data files for table %s", table.name());
var dataFiles = getAppendDataFiles(avroSchema, table, recordStream);
Log.infof("Completed writing %d records for table %s", Arrays.stream(dataFiles).map(ContentFile::recordCount).reduce(0L, Long::sum), table.name());
var recordCount = Arrays.stream(dataFiles).map(ContentFile::recordCount).reduce(0L, Long::sum);
Log.infof("Converted %d records to %d data files for table %s", recordCount, dataFiles.length, table.name());


var lockKey = List.of(tableInfo.getIcebergCatalog().toString(), tableInfo.getNamespaceList(), tableInfo.getTableName());
Expand Down Expand Up @@ -155,7 +158,7 @@ private boolean appendRecords(TableInfo tableInfo, String avroSchema, List<Inser
private DataFile[] getAppendDataFiles(String avroSchema, Table table, Stream<InsertRecord> recordStream) {
WriteResult writeResult;
try (var writer = recordWriterFactory.createRecordWriter(table)) {
var converter = new AvroIcebergConverter(avroSchema, table.schema(), table.name());
var converter = new AvroIcebergRecordConverter(avroSchema, table.schema(), table.name());
Log.infof("Will now write records to append to table %s", table.name());
var stopwatch = Stopwatch.createStarted();
writeRecordStream(recordStream, converter, writer);
Expand Down Expand Up @@ -222,7 +225,7 @@ public boolean insertChanges(TableInfo tableInfo, String avroSchema, List<Record
}
var writer = recordWriterFactory.createRecordWriter(table);

var converter = new AvroIcebergConverter(avroSchema, table.schema(), table.name());
var converter = new AvroIcebergRecordConverter(avroSchema, table.schema(), table.name());
recordChanges.forEach(recordChange -> {
switch (recordChange.getChangeCase()) {
case INSERT:
Expand Down
15 changes: 8 additions & 7 deletions flow/connectors/iceberg/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ func (c *IcebergConnector) SyncQRepRecords(
}
binaryRecords := make([]*protos.InsertRecord, 0)
for record := range stream.Records {
// Add soft delete
record = append(record, qvalue.QValueBoolean{
Val: false,
}, // add synced at colname
record = append(record,
// Add soft delete
qvalue.QValueBoolean{
Val: false,
}, // add synced at colname
qvalue.QValueTimestampTZ{
Val: time.Now(),
})
Expand Down Expand Up @@ -105,13 +106,13 @@ func getAvroSchema(
return avroSchema, nil
}

// S3 just sets up destination, not metadata tables
// Iceberg just sets up destination, not metadata tables
func (c *IcebergConnector) SetupQRepMetadataTables(_ context.Context, config *protos.QRepConfig) error {
c.logger.Info("QRep metadata setup not needed for S3.")
c.logger.Info("QRep metadata setup not needed for Iceberg.")
return nil
}

// S3 doesn't check if partition is already synced, but file with same name is overwritten
// Iceberg doesn't check if partition is already synced, but file with same name is overwritten
func (c *IcebergConnector) IsQRepPartitionSynced(ctx context.Context,
config *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
Expand Down
3 changes: 0 additions & 3 deletions flow/logger/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ func NewHandler(handler slog.Handler) slog.Handler {
}

func (h Handler) Enabled(ctx context.Context, level slog.Level) bool {
if level == slog.LevelDebug {
return true
}
return h.handler.Enabled(ctx, level)
}

Expand Down
5 changes: 0 additions & 5 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,6 @@ func PeerDBAlertingEmailSenderReplyToAddresses() string {
return GetEnvString("PEERDB_ALERTING_EMAIL_SENDER_REPLY_TO_ADDRESSES", "")
}

// PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME
func PeerDBClickhouseAWSS3BucketName() string {
return GetEnvString("PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME", "")
}

// PeerDBFlowJvmAddress is the URL of the gRPC server for the JVM-based proxy, Eg: "localhost:9801"
func PeerDBFlowJvmAddress() string {
return GetEnvString("PEERDB_FLOW_JVM_ADDRESS", "")
Expand Down
6 changes: 6 additions & 0 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl Catalog {
elasticsearch_config.encode_to_vec()
}
Config::MysqlConfig(mysql_config) => mysql_config.encode_to_vec(),
Config::IcebergConfig(iceberg_config) => {iceberg_config.encode_to_vec()}
}
};

Expand Down Expand Up @@ -334,6 +335,11 @@ impl Catalog {
pt::peerdb_peers::MySqlConfig::decode(options).with_context(err)?;
Config::MysqlConfig(mysql_config)
}
DbType::Iceberg => {
let iceberg_config =
pt::peerdb_peers::IcebergConfig::decode(options).with_context(err)?;
Config::IcebergConfig(iceberg_config)
}
})
} else {
None
Expand Down

0 comments on commit 079d627

Please sign in to comment.