postgresql - How to insert into multiple tables in parallel with foreign key relationships - Stack Overflow

I want to insert into multiple tables in parallel, these tables have foreign key relationships.The wa

I want to insert into multiple tables in parallel, these tables have foreign key relationships. The way I am attempting to do it (below) does not work i get a foreign key violation.
java.lang.Exception: .postgresql.util.PSQLException: ERROR: insert or update on table "child2" violates foreign key constraint "child2_parent_id_fkey" I know what i'm doing here not the correct way to do this but I'm having trouble understanding from what i'm reading online what the best practice is. My goals here are

  1. insert in parallel (if posible)
  2. be able to rollback all tables on any error.
class Scratch (creds: DbCreds, dbType: String, driverClassName: String, initialPoolSize: Int = 4,
               maxPoolSize: Int = 10, queryTimeout: Int = 600000) extends DbConnectionPool with Configs {

  val connectionPool = setConnectionPool(creds, dbType, driverClassName, initialPoolSize, maxPoolSize, queryTimeout)
  implicit val ec = ExecutionContext.global

  def updateTables(): Unit = {

//     these are how the tables are set up: 
//    create table parent (parent_id int primary key);
//    create table child (id int primary key, parent_id int references parent(parent_id));
//    create table child2 (id int primary key, parent_id int references parent(parent_id));

    val parentTableConn: Connection = getNonCommitConnection()
    val childTableConn: Connection  = getNonCommitConnection()
    val childTable2Conn: Connection = getNonCommitConnection()

    try {
      val parentTableFuture = Future {
        updateParent(parentTableConn)
      }
      parentTableFuture.onComplete {
        case Success(x) => println("-parent table updated successfully")
        case Failure(error) => throw new Exception(error)
      }
      Await.result(parentTableFuture, queryTimeout millisecond)
      val childTableFuture = Future {
        updateChild(childTableConn)
      }
      childTableFuture.onComplete {
        case Success(x) => println("-child table updated successfully")
        case Failure(error) => throw new Exception(error)
      }
      val childTable2Future = Future {
        updateChild2(childTable2Conn)
      }
      childTable2Future.onComplete {
        case Success(x) => println("-child2 table updated successfully")
        case Failure(error) => throw new Exception(error)
      }
      parentTableConnmit()
      childTableConnmit()
      childTable2Connmit()

    } catch {
      case e: Exception => {
        println(s"ERROR: exception thrown rolling back db commits\n${e}")
        parentTableConn.rollback()
        childTableConn.rollback()
        childTable2Conn.rollback()

        throw e
      }
    } finally {
      parentTableConn.close()
      childTableConn.close()
      childTable2Conn.close()
    }
  }

  def updateParent(conn: Connection): Unit = {
    val sql = "insert into parent (parent_id) values (1),(2),(3);"
    val stmt = conn.createStatement()
    stmt.execute(sql)
    stmt.close()
  }

  def updateChild(conn: Connection): Unit = {
    val sql = "insert into child (id, parent_id) values (1,1),(2,2),(3,1);"
    val stmt = conn.createStatement()
    stmt.execute(sql)
    stmt.close()
  }

  def updateChild2(conn: Connection): Unit = {
    val sql = "insert into child2 (id, parent_id) values (1,3),(2,3),(3,1);"
    val stmt = conn.createStatement()
    stmt.execute(sql)
    stmt.close()
  }

}

I want to insert into multiple tables in parallel, these tables have foreign key relationships. The way I am attempting to do it (below) does not work i get a foreign key violation.
java.lang.Exception: .postgresql.util.PSQLException: ERROR: insert or update on table "child2" violates foreign key constraint "child2_parent_id_fkey" I know what i'm doing here not the correct way to do this but I'm having trouble understanding from what i'm reading online what the best practice is. My goals here are

  1. insert in parallel (if posible)
  2. be able to rollback all tables on any error.
class Scratch (creds: DbCreds, dbType: String, driverClassName: String, initialPoolSize: Int = 4,
               maxPoolSize: Int = 10, queryTimeout: Int = 600000) extends DbConnectionPool with Configs {

  val connectionPool = setConnectionPool(creds, dbType, driverClassName, initialPoolSize, maxPoolSize, queryTimeout)
  implicit val ec = ExecutionContext.global

  def updateTables(): Unit = {

//     these are how the tables are set up: 
//    create table parent (parent_id int primary key);
//    create table child (id int primary key, parent_id int references parent(parent_id));
//    create table child2 (id int primary key, parent_id int references parent(parent_id));

    val parentTableConn: Connection = getNonCommitConnection()
    val childTableConn: Connection  = getNonCommitConnection()
    val childTable2Conn: Connection = getNonCommitConnection()

    try {
      val parentTableFuture = Future {
        updateParent(parentTableConn)
      }
      parentTableFuture.onComplete {
        case Success(x) => println("-parent table updated successfully")
        case Failure(error) => throw new Exception(error)
      }
      Await.result(parentTableFuture, queryTimeout millisecond)
      val childTableFuture = Future {
        updateChild(childTableConn)
      }
      childTableFuture.onComplete {
        case Success(x) => println("-child table updated successfully")
        case Failure(error) => throw new Exception(error)
      }
      val childTable2Future = Future {
        updateChild2(childTable2Conn)
      }
      childTable2Future.onComplete {
        case Success(x) => println("-child2 table updated successfully")
        case Failure(error) => throw new Exception(error)
      }
      parentTableConnmit()
      childTableConnmit()
      childTable2Connmit()

    } catch {
      case e: Exception => {
        println(s"ERROR: exception thrown rolling back db commits\n${e}")
        parentTableConn.rollback()
        childTableConn.rollback()
        childTable2Conn.rollback()

        throw e
      }
    } finally {
      parentTableConn.close()
      childTableConn.close()
      childTable2Conn.close()
    }
  }

  def updateParent(conn: Connection): Unit = {
    val sql = "insert into parent (parent_id) values (1),(2),(3);"
    val stmt = conn.createStatement()
    stmt.execute(sql)
    stmt.close()
  }

  def updateChild(conn: Connection): Unit = {
    val sql = "insert into child (id, parent_id) values (1,1),(2,2),(3,1);"
    val stmt = conn.createStatement()
    stmt.execute(sql)
    stmt.close()
  }

  def updateChild2(conn: Connection): Unit = {
    val sql = "insert into child2 (id, parent_id) values (1,3),(2,3),(3,1);"
    val stmt = conn.createStatement()
    stmt.execute(sql)
    stmt.close()
  }

}
Share Improve this question asked Mar 10 at 16:12 J.HammondJ.Hammond 2786 silver badges20 bronze badges 7
  • 1 Concurrent sessions(connections) will not be able to 'see' actions of the other sessions until the actions are committed. If you want this to work you have to either: a) Do it all in one session(connection) or b) Commit the parent action first and then do the child actions. For more information see Transaction Isolation. – Adrian Klaver Commented Mar 10 at 16:18
  • you are trying to parallelize inserting a single parent and a single row in each of two child tables? or are you hoping to parallelize doing this many times? if the latter, are the parents all distinct? – ysth Commented Mar 10 at 16:41
  • In my real use case I would be trying to insert multiple rows into each table (including parent). The "parent" entities would be distinct. – J.Hammond Commented Mar 10 at 16:45
  • Read the link I posted in my previous comment, it will tell you the restrictions you face. – Adrian Klaver Commented Mar 10 at 16:50
  • Adrian Klaver, yes I think I understand from the docs that you cannot read anything uncommitted from another connection. I know I used the wrong terms there but basically you can't do this with postgres. So my issue here is that i'm using different connections for each table? I need to use the same connection and parent table insert query cannot be parallel to the child queries? – J.Hammond Commented Mar 10 at 17:07
 |  Show 2 more comments

1 Answer 1

Reset to default 0

This was my solution for those interested. Also in my real use case I also used DEFERRABLE foreign key constraints per Adrian Klaver's suggestion.

class Scratch (creds: DbCreds, dbType: String, driverClassName: String, initialPoolSize: Int = 4,
               maxPoolSize: Int = 10, queryTimeout: Int = 600000) extends DbConnectionPool with Configs {

  val connectionPool = setConnectionPool(creds, dbType, driverClassName, initialPoolSize, maxPoolSize, queryTimeout)
  implicit val ec = ExecutionContext.global

  def updateTables(): Unit = {

//    create table parent (parent_id int primary key);
//    create table child (id int primary key, parent_id int references parent(parent_id));
//    create table child2 (id int primary key, parent_id int references parent(parent_id));

    val tableConn: Connection = getNonCommitConnection()


    try {
      updateParent(tableConn)

      val f = Future {
        updateChild(tableConn)
      }
      f.onComplete({
        case Success(_) => println("success1")
        case Failure(e) => println(s"ERROR: exception thrown rolling back db commits\n${e}")
      })
      val f2 = Future {
        updateChild2(tableConn)
      }
      f2.onComplete({
        case Success(_) => println("success2")
        case Failure(e) => println(s"ERROR: exception thrown rolling back db commits\n${e}")
      })

      tableConnmit()

    } catch {
      case e: Exception => {
        println(s"ERROR: exception thrown rolling back db commits\n${e}")
        tableConn.rollback()


        throw e
      }
    } finally {
      tableConn.close()

    }
  }



  def updateParent(conn: Connection): Unit = {
    val sql = "insert into parent (parent_id) values (1),(2),(3);"
    val stmt = conn.createStatement()
    stmt.execute(sql)
    stmt.close()
  }

  def updateChild(conn: Connection): Unit = {
    val sql = "insert into child (id, parent_id) values (1,1),(2,2),(3,1);"
    val stmt = conn.createStatement()
    stmt.execute(sql)
    stmt.close()
  }

  def updateChild2(conn: Connection): Unit = {
    val sql = "insert into child2 (id, parent_id) values (1,3),(2,3),(3,1);"
    val stmt = conn.createStatement()
    stmt.execute(sql)
    stmt.close()
  }

}

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744835483a4596236.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信