Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graphsupport for the Polypheny JDBC Driver #15

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
plugins {
id 'java'
id 'java-library'
id 'maven-publish'
id 'signing'
Expand All @@ -20,6 +21,7 @@ version = versionMajor + "." + versionMinor + (versionQualifier != '' ? "-" + ve

repositories {
mavenCentral()
mavenLocal()
maven {
url "https://plugins.gradle.org/m2/"
}
Expand Down Expand Up @@ -55,7 +57,7 @@ def protobufVersion = "3.23.4"

dependencies {
// Prism API files (protobuf files), needs to be implementation due to the prism-api-version.properties
implementation group: 'org.polypheny', name: 'prism', version: '1.9'
implementation group: 'org.polypheny', name: 'prism', version: '1.10'

// Protobuf
implementation group: 'com.google.protobuf', name: 'protobuf-java', version: protobufVersion
Expand Down
44 changes: 42 additions & 2 deletions src/main/java/org/polypheny/jdbc/PrismInterfaceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@
import org.polypheny.prism.Entity;
import org.polypheny.prism.ExecuteIndexedStatementBatchRequest;
import org.polypheny.prism.ExecuteIndexedStatementRequest;
import org.polypheny.prism.ExecuteNamedStatementRequest;
import org.polypheny.prism.ExecuteUnparameterizedStatementBatchRequest;
import org.polypheny.prism.ExecuteUnparameterizedStatementRequest;
import org.polypheny.prism.FetchRequest;
import org.polypheny.prism.Frame;
import org.polypheny.prism.Function;
import org.polypheny.prism.FunctionsRequest;
import org.polypheny.prism.IndexedParameters;
import org.polypheny.prism.NamedParameters;
import org.polypheny.prism.Namespace;
import org.polypheny.prism.NamespacesRequest;
import org.polypheny.prism.PrepareStatementRequest;
Expand Down Expand Up @@ -145,15 +147,26 @@ public void unregister( int timeout ) throws PrismInterfaceServiceException {


public void executeUnparameterizedStatement( String namespaceName, String languageName, String statement, CallbackQueue<StatementResponse> callback, int timeout ) throws PrismInterfaceServiceException {
ExecuteUnparameterizedStatementRequest request = buildExecuteUnparameterizedStatementRequest( statement, namespaceName, languageName );
rpc.executeUnparameterizedStatement( request, callback ); // TODO timeout
}


public void executeUnparameterizedStatementBatch( List<String> statements, String namespaceName, String languageName, CallbackQueue<StatementBatchResponse> updateCallback, int timeout ) throws PrismInterfaceServiceException {
List<ExecuteUnparameterizedStatementRequest> requests = statements.stream().map( s -> buildExecuteUnparameterizedStatementRequest( s, namespaceName, languageName ) ).collect( Collectors.toList() );
executeUnparameterizedStatementBatch( requests, updateCallback, timeout );
}


public ExecuteUnparameterizedStatementRequest buildExecuteUnparameterizedStatementRequest( String statement, String namespaceName, String languageName ) {
ExecuteUnparameterizedStatementRequest.Builder requestBuilder = ExecuteUnparameterizedStatementRequest.newBuilder();
if ( namespaceName != null ) {
requestBuilder.setNamespaceName( namespaceName );
}
ExecuteUnparameterizedStatementRequest request = requestBuilder
return requestBuilder
.setLanguageName( languageName )
.setStatement( statement )
.build();
rpc.executeUnparameterizedStatement( request, callback ); // TODO timeout
}


Expand All @@ -179,6 +192,19 @@ public PreparedStatementSignature prepareIndexedStatement( String namespaceName,
}


public PreparedStatementSignature prepareNamedStatement( String namespaceName, String languageName, String statement, int timeout ) throws PrismInterfaceServiceException {
PrepareStatementRequest.Builder requestBuilder = PrepareStatementRequest.newBuilder();
if ( namespaceName != null ) {
requestBuilder.setNamespaceName( namespaceName );
}
PrepareStatementRequest request = requestBuilder
.setStatement( statement )
.setLanguageName( languageName )
.build();
return rpc.prepareNamedStatement( request, timeout );
}


public StatementResult executeIndexedStatement( int statementId, List<TypedValue> values, int fetchSize, int timeout ) throws PrismInterfaceServiceException {
IndexedParameters parameters = IndexedParameters.newBuilder()
.addAllParameters( ProtoUtils.serializeParameterList( values ) )
Expand All @@ -193,6 +219,20 @@ public StatementResult executeIndexedStatement( int statementId, List<TypedValue
}


public StatementResult executeNamedStatement( int statementId, Map<String, TypedValue> values, int fetchSize, int timeout ) throws PrismInterfaceServiceException {
NamedParameters parameters = NamedParameters.newBuilder()
.putAllParameters( ProtoUtils.serializeParameterMap( values ) )
.build();
ExecuteNamedStatementRequest request = ExecuteNamedStatementRequest.newBuilder()
.setStatementId( statementId )
.setParameters( parameters )
.setFetchSize( fetchSize )
.build();

return rpc.executeNamedStatement( request, timeout );
}


public StatementBatchResponse executeIndexedStatementBatch( int statementId, List<List<TypedValue>> parameterBatch, int timeout ) throws PrismInterfaceServiceException {
List<IndexedParameters> parameters = parameterBatch.stream()
.map( ProtoUtils::serializeParameterList )
Expand Down
47 changes: 31 additions & 16 deletions src/main/java/org/polypheny/jdbc/RpcService.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.polypheny.prism.EntitiesResponse;
import org.polypheny.prism.ExecuteIndexedStatementBatchRequest;
import org.polypheny.prism.ExecuteIndexedStatementRequest;
import org.polypheny.prism.ExecuteNamedStatementRequest;
import org.polypheny.prism.ExecuteUnparameterizedStatementBatchRequest;
import org.polypheny.prism.ExecuteUnparameterizedStatementRequest;
import org.polypheny.prism.FetchRequest;
Expand Down Expand Up @@ -90,25 +91,25 @@
public class RpcService {

private final AtomicLong idCounter = new AtomicLong( 1 );
private final Transport con;
private final Transport connection;
private final Thread service;
private boolean closed = false;
private boolean disconnectSent = false;
private boolean isClosed = false;
private boolean hasSentDisconnect = false;
private IOException error = null;
private final Map<Long, CompletableFuture<Response>> callbacks = new ConcurrentHashMap<>();
private final Map<Long, CallbackQueue<?>> callbackQueues = new ConcurrentHashMap<>();


RpcService( Transport con ) {
this.con = con;
RpcService( Transport connection ) {
this.connection = connection;
this.service = new Thread( this::readResponses, "PrismInterfaceResponseHandler" );
this.service.start();
}


void close() {
closed = true;
con.close();
isClosed = true;
connection.close();
try {
service.join();
} catch ( InterruptedException e ) {
Expand All @@ -131,15 +132,15 @@ private void sendMessage( Request req ) throws IOException {
throw e;
}
}
if ( this.closed ) {
if ( this.isClosed ) {
throw new IOException( "Connection is closed" );
}
con.sendMessage( req.toByteArray() );
connection.sendMessage( req.toByteArray() );
}


private Response receiveMessage() throws IOException {
return Response.parseFrom( con.receiveMessage() );
return Response.parseFrom( connection.receiveMessage() );
}


Expand Down Expand Up @@ -177,27 +178,27 @@ private void readResponses() {
c.complete( resp );
}
} catch ( EOFException | ClosedChannelException e ) {
this.closed = true;
this.isClosed = true;
callbacks.forEach( ( id, c ) -> c.completeExceptionally( e ) );
callbackQueues.forEach( ( id, cq ) -> cq.onError( e ) );
} catch ( IOException e ) { // Communicate this to ProtoInterfaceClient
this.closed = true;
this.isClosed = true;
callbacks.forEach( ( id, c ) -> c.completeExceptionally( e ) );
callbackQueues.forEach( ( id, cq ) -> cq.onError( e ) );
/* For Windows */
if ( e.getMessage().contains( "An existing connection was forcibly closed by the remote host" ) && disconnectSent ) {
if ( e.getMessage().contains( "An existing connection was forcibly closed by the remote host" ) && hasSentDisconnect ) {
return;
}
/* For Windows */
if ( e instanceof SocketException && e.getMessage().contains( "Connection reset" ) && disconnectSent ) {
if ( e instanceof SocketException && e.getMessage().contains( "Connection reset" ) && hasSentDisconnect ) {
return;
}
// This will cause the exception to be thrown when the next call is made
// TODO: Is this good enough, or should the program be alerted sooner?
this.error = e;
throw new RuntimeException( e );
} catch ( Throwable t ) {
this.closed = true;
this.isClosed = true;
callbacks.forEach( ( id, c ) -> c.completeExceptionally( t ) );
callbackQueues.forEach( ( id, cq ) -> cq.onError( t ) );
log.error( "Unhandled exception", t );
Expand All @@ -224,7 +225,7 @@ private Response completeSynchronously( Request.Builder req, int timeout ) throw
CompletableFuture<Response> f = new CompletableFuture<>();
callbacks.put( req.getId(), f );
if ( req.getTypeCase() == TypeCase.DISCONNECT_REQUEST ) {
disconnectSent = true;
hasSentDisconnect = true;
}
sendMessage( req.build() );
Response resp = waitForCompletion( f, timeout );
Expand Down Expand Up @@ -424,13 +425,27 @@ PreparedStatementSignature prepareIndexedStatement( PrepareStatementRequest msg,
}


public PreparedStatementSignature prepareNamedStatement( PrepareStatementRequest msg, int timeout ) throws PrismInterfaceServiceException {
Request.Builder req = newMessage();
req.setPrepareIndexedStatementRequest( msg );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not be setPrepareNamedStatementRequest?

return completeSynchronously( req, timeout ).getPreparedStatementSignature();
}


StatementResult executeIndexedStatement( ExecuteIndexedStatementRequest msg, int timeout ) throws PrismInterfaceServiceException {
Request.Builder req = newMessage();
req.setExecuteIndexedStatementRequest( msg );
return completeSynchronously( req, timeout ).getStatementResult();
}


public StatementResult executeNamedStatement( ExecuteNamedStatementRequest msg, int timeout ) throws PrismInterfaceServiceException {
Request.Builder req = newMessage();
req.setExecuteNamedStatementRequest( msg );
return completeSynchronously( req, timeout ).getStatementResult();
}


StatementBatchResponse executeIndexedStatementBatch( ExecuteIndexedStatementBatchRequest msg, int timeout ) throws PrismInterfaceServiceException {
Request.Builder req = newMessage();
req.setExecuteIndexedStatementBatchRequest( msg );
Expand Down
116 changes: 116 additions & 0 deletions src/main/java/org/polypheny/jdbc/multimodel/GraphResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2019-2024 The Polypheny Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.polypheny.jdbc.multimodel;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.polypheny.jdbc.PolyConnection;
import org.polypheny.jdbc.PrismInterfaceClient;
import org.polypheny.jdbc.PrismInterfaceErrors;
import org.polypheny.jdbc.PrismInterfaceServiceException;
import org.polypheny.jdbc.properties.PropertyUtils;
import org.polypheny.jdbc.types.PolyGraphElement;
import org.polypheny.prism.Frame;
import org.polypheny.prism.Frame.ResultCase;
import org.polypheny.prism.GraphFrame;

public class GraphResult extends Result implements Iterable<PolyGraphElement> { // implements iterable over some graph representation

private final PolyStatement polyStatement;
private boolean isFullyFetched;
private final List<PolyGraphElement> elements;


public GraphResult( Frame frame, PolyStatement polyStatement ) {
super( ResultType.GRAPH );
this.polyStatement = polyStatement;
this.isFullyFetched = frame.getIsLast();
this.elements = new ArrayList<>();
addGraphElements( frame.getGraphFrame() );
}


private void addGraphElements( GraphFrame graphFrame ) {
graphFrame.getElementList().forEach( n -> elements.add( PolyGraphElement.of( n ) ) );
}


private void fetchMore() throws PrismInterfaceServiceException {
int id = polyStatement.getStatementId();
int timeout = getPolyphenyConnection().getTimeout();
Frame frame = getPrismInterfaceClient().fetchResult( id, timeout, PropertyUtils.getDEFAULT_FETCH_SIZE() );
if ( frame.getResultCase() != ResultCase.GRAPH_FRAME ) {
throw new PrismInterfaceServiceException(
PrismInterfaceErrors.RESULT_TYPE_INVALID,
"Statement returned a result of illegal type " + frame.getResultCase()
);
}
isFullyFetched = frame.getIsLast();
addGraphElements( frame.getGraphFrame() );
}


private PolyConnection getPolyphenyConnection() {
return polyStatement.getConnection();
}


private PrismInterfaceClient getPrismInterfaceClient() {
return getPolyphenyConnection().getPrismInterfaceClient();
}


@Override
public Iterator<PolyGraphElement> iterator() {
return new GraphElementIterator();
}


class GraphElementIterator implements Iterator<PolyGraphElement> {

int index = -1;


@Override
public boolean hasNext() {
if ( index + 1 >= elements.size() ) {
if ( isFullyFetched ) {
return false;
}
try {
fetchMore();
} catch ( PrismInterfaceServiceException e ) {
throw new RuntimeException( e );
}
}
return index + 1 < elements.size();
}


@Override
public PolyGraphElement next() {
if ( !hasNext() ) {
throw new NoSuchElementException( "There are no more graph elements" );
}
return elements.get( ++index );
}

}

}
Loading
Loading