From fd4e220e1923a005f0bef5595719fb7614ef71b7 Mon Sep 17 00:00:00 2001 From: amolskh Date: Mon, 10 Aug 2015 20:17:24 -0700 Subject: [PATCH] Set of Chekings for Checksum support --- .classpath | 39 ++++++------------ .project | 41 ++++++++----------- .../or/binlog/impl/FileBasedBinlogParser.java | 26 ++++++++++++ .../impl/parser/DeleteRowsEventV2Parser.java | 3 +- .../impl/parser/UpdateRowsEventV2Parser.java | 3 +- .../impl/parser/WriteRowsEventV2Parser.java | 3 +- .../code/or/common/util/CodecUtils.java | 12 ++++++ .../code/or/io/impl/XInputStreamImpl.java | 20 +++++++++ .../com/google/code/or/OpenParserTest.java | 4 +- 9 files changed, 96 insertions(+), 55 deletions(-) diff --git a/.classpath b/.classpath index b0e44f4..4b606b5 100644 --- a/.classpath +++ b/.classpath @@ -1,27 +1,12 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + \ No newline at end of file diff --git a/.project b/.project index 2a5f0e6..5cdf2bd 100644 --- a/.project +++ b/.project @@ -1,23 +1,18 @@ - - - open-replicator - - - - - - org.eclipse.jdt.core.javabuilder - - - - - org.eclipse.m2e.core.maven2Builder - - - - - - org.eclipse.jdt.core.javanature - org.eclipse.m2e.core.maven2Nature - - + + + open-replicator + Open Replicator is a high performance MySQL binlog parser written in Java. NO_M2ECLIPSE_SUPPORT: Project files created with the maven-eclipse-plugin are not supported in M2Eclipse. + + + + org.eclipse.jdt.core.javabuilder + + + org.eclipse.m2e.core.maven2Builder + + + + org.eclipse.jdt.core.javanature + org.eclipse.m2e.core.maven2Nature + + \ No newline at end of file diff --git a/src/main/java/com/google/code/or/binlog/impl/FileBasedBinlogParser.java b/src/main/java/com/google/code/or/binlog/impl/FileBasedBinlogParser.java index 8618193..95b0d4a 100644 --- a/src/main/java/com/google/code/or/binlog/impl/FileBasedBinlogParser.java +++ b/src/main/java/com/google/code/or/binlog/impl/FileBasedBinlogParser.java @@ -18,6 +18,8 @@ import java.io.File; import java.util.concurrent.TimeUnit; +import java.util.zip.CRC32; +import java.util.zip.Checksum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +47,7 @@ public class FileBasedBinlogParser extends AbstractBinlogParser { protected String binlogFilePath; protected long stopPosition = 0; protected long startPosition = 4; + public static int availableLimit = 0; /** @@ -109,6 +112,7 @@ protected void doParse() throws Exception { try { // final BinlogEventV4HeaderImpl header = new BinlogEventV4HeaderImpl(); + ((XInputStreamImpl)is).startByteRecording(); header.setTimestamp(is.readLong(4) * 1000L); header.setEventType(is.readInt(1)); header.setServerId(is.readLong(4)); @@ -117,6 +121,7 @@ protected void doParse() throws Exception { header.setFlags(is.readInt(2)); header.setTimestampOfReceipt(System.currentTimeMillis()); is.setReadLimit((int)(header.getEventLength() - header.getHeaderLength())); // Ensure the event boundary + if(isVerbose() && LOGGER.isInfoEnabled()) { LOGGER.info("read an event, header: {}", header); } @@ -133,6 +138,27 @@ protected void doParse() throws Exception { BinlogEventParser parser = getEventParser(header.getEventType()); if(parser == null) parser = this.defaultParser; parser.parse(is, header, context); + + byte[] eventBytes = ((XInputStreamImpl)is).stopRecording(); + if(is.available() == 4 && null != eventBytes) + { + is.setReadLimit(4); + byte[] checkSumBytes = is.readBytes(4); + Checksum checksumUtility = new CRC32(); + checksumUtility.update(eventBytes, 0, eventBytes.length); + long checkSumValComputed = checksumUtility.getValue(); + long checkSumValReceived = CodecUtils.toLong(CodecUtils.toBigEndian(checkSumBytes),0,checkSumBytes.length); + if(checkSumValComputed != checkSumValReceived) + { + throw new RuntimeException("Checksum did not match for event type: " + header.getEventType()); + } + else + { + //Event size greater than normal is used to detect that file has checksum enabled + //This will be used for further event processing + availableLimit = 4; + } + } } // Ensure the packet boundary diff --git a/src/main/java/com/google/code/or/binlog/impl/parser/DeleteRowsEventV2Parser.java b/src/main/java/com/google/code/or/binlog/impl/parser/DeleteRowsEventV2Parser.java index b6f52d8..d6be509 100644 --- a/src/main/java/com/google/code/or/binlog/impl/parser/DeleteRowsEventV2Parser.java +++ b/src/main/java/com/google/code/or/binlog/impl/parser/DeleteRowsEventV2Parser.java @@ -22,6 +22,7 @@ import com.google.code.or.binlog.BinlogEventV4Header; import com.google.code.or.binlog.BinlogParserContext; +import com.google.code.or.binlog.impl.FileBasedBinlogParser; import com.google.code.or.binlog.impl.event.DeleteRowsEventV2; import com.google.code.or.binlog.impl.event.TableMapEvent; import com.google.code.or.common.glossary.Row; @@ -71,7 +72,7 @@ public void parse(XInputStream is, BinlogEventV4Header header, BinlogParserConte protected List parseRows(XInputStream is, TableMapEvent tme, DeleteRowsEventV2 dre) throws IOException { final List r = new LinkedList(); - while(is.available() > 0) { + while(is.available() > FileBasedBinlogParser.availableLimit) { r.add(parseRow(is, tme, dre.getUsedColumns())); } return r; diff --git a/src/main/java/com/google/code/or/binlog/impl/parser/UpdateRowsEventV2Parser.java b/src/main/java/com/google/code/or/binlog/impl/parser/UpdateRowsEventV2Parser.java index 83f2642..29cf031 100644 --- a/src/main/java/com/google/code/or/binlog/impl/parser/UpdateRowsEventV2Parser.java +++ b/src/main/java/com/google/code/or/binlog/impl/parser/UpdateRowsEventV2Parser.java @@ -22,6 +22,7 @@ import com.google.code.or.binlog.BinlogEventV4Header; import com.google.code.or.binlog.BinlogParserContext; +import com.google.code.or.binlog.impl.FileBasedBinlogParser; import com.google.code.or.binlog.impl.event.TableMapEvent; import com.google.code.or.binlog.impl.event.UpdateRowsEventV2; import com.google.code.or.common.glossary.Pair; @@ -73,7 +74,7 @@ public void parse(XInputStream is, BinlogEventV4Header header, BinlogParserConte protected List> parseRows(XInputStream is, TableMapEvent tme, UpdateRowsEventV2 ure) throws IOException { final List> r = new LinkedList>(); - while(is.available() > 0) { + while(is.available() > FileBasedBinlogParser.availableLimit) { final Row before = parseRow(is, tme, ure.getUsedColumnsBefore()); final Row after = parseRow(is, tme, ure.getUsedColumnsAfter()); r.add(new Pair(before, after)); diff --git a/src/main/java/com/google/code/or/binlog/impl/parser/WriteRowsEventV2Parser.java b/src/main/java/com/google/code/or/binlog/impl/parser/WriteRowsEventV2Parser.java index ffdd119..26ee173 100644 --- a/src/main/java/com/google/code/or/binlog/impl/parser/WriteRowsEventV2Parser.java +++ b/src/main/java/com/google/code/or/binlog/impl/parser/WriteRowsEventV2Parser.java @@ -22,6 +22,7 @@ import com.google.code.or.binlog.BinlogEventV4Header; import com.google.code.or.binlog.BinlogParserContext; +import com.google.code.or.binlog.impl.FileBasedBinlogParser; import com.google.code.or.binlog.impl.event.TableMapEvent; import com.google.code.or.binlog.impl.event.WriteRowsEventV2; import com.google.code.or.common.glossary.Row; @@ -71,7 +72,7 @@ public void parse(XInputStream is, BinlogEventV4Header header, BinlogParserConte protected List parseRows(XInputStream is, TableMapEvent tme, WriteRowsEventV2 wre) throws IOException { final List r = new LinkedList(); - while(is.available() > 0) { + while(is.available() > FileBasedBinlogParser.availableLimit) { r.add(parseRow(is, tme, wre.getUsedColumns())); } return r; diff --git a/src/main/java/com/google/code/or/common/util/CodecUtils.java b/src/main/java/com/google/code/or/common/util/CodecUtils.java index 3ecf246..233eaa7 100644 --- a/src/main/java/com/google/code/or/common/util/CodecUtils.java +++ b/src/main/java/com/google/code/or/common/util/CodecUtils.java @@ -21,6 +21,7 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; +import java.util.List; /** * @@ -87,6 +88,17 @@ public static byte[] toByteArray(long num) { return r; } + public static byte[] toByteArray(List in) + { + final int n = in.size(); + byte ret[] = new byte[n]; + for (int i = 0; i < n; i++) + { + ret[i] = in.get(i); + } + return ret; + } + /** * */ diff --git a/src/main/java/com/google/code/or/io/impl/XInputStreamImpl.java b/src/main/java/com/google/code/or/io/impl/XInputStreamImpl.java index 6759886..d71b276 100644 --- a/src/main/java/com/google/code/or/io/impl/XInputStreamImpl.java +++ b/src/main/java/com/google/code/or/io/impl/XInputStreamImpl.java @@ -19,6 +19,8 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; import com.google.code.or.common.glossary.UnsignedLong; import com.google.code.or.common.glossary.column.BitColumn; @@ -40,6 +42,7 @@ public class XInputStreamImpl extends InputStream implements XInputStream { private int readLimit = 0; private final byte[] buffer; private final InputStream is; + private List recordBytes; /** @@ -47,11 +50,25 @@ public class XInputStreamImpl extends InputStream implements XInputStream { */ public XInputStreamImpl(InputStream is) { this(is, 512 * 1024); + recordBytes = new ArrayList(); } public XInputStreamImpl(InputStream is, int size) { this.is = is; this.buffer = new byte[size]; + recordBytes = new ArrayList(); + } + + + public void startByteRecording() + { + recordBytes = new ArrayList(); + } + + public byte[] stopRecording() + { + byte[] b = CodecUtils.toByteArray(recordBytes); + return b; } /** @@ -68,6 +85,8 @@ public long readLong(int length) throws IOException { public byte[] readBytes(int length) throws IOException { final byte[] r = new byte[length]; this.read(r, 0, length); + for(int i=0;i= this.tail) doFill(); + recordBytes.add(new Byte(this.buffer[this.head])); final int r = this.buffer[this.head++] & 0xFF; ++this.readCount; return r; diff --git a/src/test/java/com/google/code/or/OpenParserTest.java b/src/test/java/com/google/code/or/OpenParserTest.java index 430c26e..bcc1946 100644 --- a/src/test/java/com/google/code/or/OpenParserTest.java +++ b/src/test/java/com/google/code/or/OpenParserTest.java @@ -22,8 +22,8 @@ public static void main(String args[]) throws Exception { // final OpenParser op = new OpenParser(); op.setStartPosition(4); - op.setBinlogFileName("mysql_bin.000031"); - op.setBinlogFilePath("C:/Documents and Settings/All Users/Application Data/MySQL/MySQL Server 5.5/data"); + op.setBinlogFileName("coresa-log-bin.002767"); + op.setBinlogFilePath("/home/amolk/"); op.setBinlogEventListener(new BinlogEventListener() { public void onEvents(BinlogEventV4 event) { if(event instanceof XidEvent) {