Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set of Chekings for Checksum support #11

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 12 additions & 27 deletions .classpath
Original file line number Diff line number Diff line change
@@ -1,27 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" path="src/test/resources"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src/test/java" output="target/test-classes" including="**/*.java"/>
<classpathentry kind="src" path="src/test/resources" output="target/test-classes" excluding="**/*.java"/>
<classpathentry kind="src" path="src/main/java" including="**/*.java"/>
<classpathentry kind="output" path="target/classes"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="var" path="M2_REPO/junit/junit/3.8.1/junit-3.8.1.jar"/>
<classpathentry kind="var" path="M2_REPO/org/slf4j/slf4j-api/1.7.5/slf4j-api-1.7.5.jar"/>
<classpathentry kind="var" path="M2_REPO/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar"/>
<classpathentry kind="var" path="M2_REPO/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar"/>
</classpath>
41 changes: 18 additions & 23 deletions .project
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>open-replicator</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>open-replicator</name>
<comment>Open Replicator is a high performance MySQL binlog parser written in Java. NO_M2ECLIPSE_SUPPORT: Project files created with the maven-eclipse-plugin are not supported in M2Eclipse.</comment>
<projects/>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.io.File;
import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32;
import java.util.zip.Checksum;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -45,6 +47,7 @@ public class FileBasedBinlogParser extends AbstractBinlogParser {
protected String binlogFilePath;
protected long stopPosition = 0;
protected long startPosition = 4;
public static int availableLimit = 0;


/**
Expand Down Expand Up @@ -109,6 +112,7 @@ protected void doParse() throws Exception {
try {
//
final BinlogEventV4HeaderImpl header = new BinlogEventV4HeaderImpl();
((XInputStreamImpl)is).startByteRecording();
header.setTimestamp(is.readLong(4) * 1000L);
header.setEventType(is.readInt(1));
header.setServerId(is.readLong(4));
Expand All @@ -117,6 +121,7 @@ protected void doParse() throws Exception {
header.setFlags(is.readInt(2));
header.setTimestampOfReceipt(System.currentTimeMillis());
is.setReadLimit((int)(header.getEventLength() - header.getHeaderLength())); // Ensure the event boundary

if(isVerbose() && LOGGER.isInfoEnabled()) {
LOGGER.info("read an event, header: {}", header);
}
Expand All @@ -133,6 +138,27 @@ protected void doParse() throws Exception {
BinlogEventParser parser = getEventParser(header.getEventType());
if(parser == null) parser = this.defaultParser;
parser.parse(is, header, context);

byte[] eventBytes = ((XInputStreamImpl)is).stopRecording();
if(is.available() == 4 && null != eventBytes)
{
is.setReadLimit(4);
byte[] checkSumBytes = is.readBytes(4);
Checksum checksumUtility = new CRC32();
checksumUtility.update(eventBytes, 0, eventBytes.length);
long checkSumValComputed = checksumUtility.getValue();
long checkSumValReceived = CodecUtils.toLong(CodecUtils.toBigEndian(checkSumBytes),0,checkSumBytes.length);
if(checkSumValComputed != checkSumValReceived)
{
throw new RuntimeException("Checksum did not match for event type: " + header.getEventType());
}
else
{
//Event size greater than normal is used to detect that file has checksum enabled
//This will be used for further event processing
availableLimit = 4;
}
}
}

// Ensure the packet boundary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.binlog.BinlogParserContext;
import com.google.code.or.binlog.impl.FileBasedBinlogParser;
import com.google.code.or.binlog.impl.event.DeleteRowsEventV2;
import com.google.code.or.binlog.impl.event.TableMapEvent;
import com.google.code.or.common.glossary.Row;
Expand Down Expand Up @@ -71,7 +72,7 @@ public void parse(XInputStream is, BinlogEventV4Header header, BinlogParserConte
protected List<Row> parseRows(XInputStream is, TableMapEvent tme, DeleteRowsEventV2 dre)
throws IOException {
final List<Row> r = new LinkedList<Row>();
while(is.available() > 0) {
while(is.available() > FileBasedBinlogParser.availableLimit) {
r.add(parseRow(is, tme, dre.getUsedColumns()));
}
return r;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.binlog.BinlogParserContext;
import com.google.code.or.binlog.impl.FileBasedBinlogParser;
import com.google.code.or.binlog.impl.event.TableMapEvent;
import com.google.code.or.binlog.impl.event.UpdateRowsEventV2;
import com.google.code.or.common.glossary.Pair;
Expand Down Expand Up @@ -73,7 +74,7 @@ public void parse(XInputStream is, BinlogEventV4Header header, BinlogParserConte
protected List<Pair<Row>> parseRows(XInputStream is, TableMapEvent tme, UpdateRowsEventV2 ure)
throws IOException {
final List<Pair<Row>> r = new LinkedList<Pair<Row>>();
while(is.available() > 0) {
while(is.available() > FileBasedBinlogParser.availableLimit) {
final Row before = parseRow(is, tme, ure.getUsedColumnsBefore());
final Row after = parseRow(is, tme, ure.getUsedColumnsAfter());
r.add(new Pair<Row>(before, after));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.binlog.BinlogParserContext;
import com.google.code.or.binlog.impl.FileBasedBinlogParser;
import com.google.code.or.binlog.impl.event.TableMapEvent;
import com.google.code.or.binlog.impl.event.WriteRowsEventV2;
import com.google.code.or.common.glossary.Row;
Expand Down Expand Up @@ -71,7 +72,7 @@ public void parse(XInputStream is, BinlogEventV4Header header, BinlogParserConte
protected List<Row> parseRows(XInputStream is, TableMapEvent tme, WriteRowsEventV2 wre)
throws IOException {
final List<Row> r = new LinkedList<Row>();
while(is.available() > 0) {
while(is.available() > FileBasedBinlogParser.availableLimit) {
r.add(parseRow(is, tme, wre.getUsedColumns()));
}
return r;
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/google/code/or/common/util/CodecUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;

/**
*
Expand Down Expand Up @@ -87,6 +88,17 @@ public static byte[] toByteArray(long num) {
return r;
}

public static byte[] toByteArray(List<Byte> in)
{
final int n = in.size();
byte ret[] = new byte[n];
for (int i = 0; i < n; i++)
{
ret[i] = in.get(i);
}
return ret;
}

/**
*
*/
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/google/code/or/io/impl/XInputStreamImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;

import com.google.code.or.common.glossary.UnsignedLong;
import com.google.code.or.common.glossary.column.BitColumn;
Expand All @@ -40,18 +42,33 @@ public class XInputStreamImpl extends InputStream implements XInputStream {
private int readLimit = 0;
private final byte[] buffer;
private final InputStream is;
private List<Byte> recordBytes;


/**
*
*/
public XInputStreamImpl(InputStream is) {
this(is, 512 * 1024);
recordBytes = new ArrayList<Byte>();
}

public XInputStreamImpl(InputStream is, int size) {
this.is = is;
this.buffer = new byte[size];
recordBytes = new ArrayList<Byte>();
}


public void startByteRecording()
{
recordBytes = new ArrayList<Byte>();
}

public byte[] stopRecording()
{
byte[] b = CodecUtils.toByteArray(recordBytes);
return b;
}

/**
Expand All @@ -68,6 +85,8 @@ public long readLong(int length) throws IOException {
public byte[] readBytes(int length) throws IOException {
final byte[] r = new byte[length];
this.read(r, 0, length);
for(int i=0;i<length;i++)
recordBytes.add(r[i]);
return r;
}

Expand Down Expand Up @@ -211,6 +230,7 @@ public int read() throws IOException {
throw new ExceedLimitException();
} else {
if(this.head >= this.tail) doFill();
recordBytes.add(new Byte(this.buffer[this.head]));
final int r = this.buffer[this.head++] & 0xFF;
++this.readCount;
return r;
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/com/google/code/or/OpenParserTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public static void main(String args[]) throws Exception {
//
final OpenParser op = new OpenParser();
op.setStartPosition(4);
op.setBinlogFileName("mysql_bin.000031");
op.setBinlogFilePath("C:/Documents and Settings/All Users/Application Data/MySQL/MySQL Server 5.5/data");
op.setBinlogFileName("coresa-log-bin.002767");
op.setBinlogFilePath("/home/amolk/");
op.setBinlogEventListener(new BinlogEventListener() {
public void onEvents(BinlogEventV4 event) {
if(event instanceof XidEvent) {
Expand Down