scala - Kerberors Ticket renewal for long running application - Stack Overflow

We have a long running app, the kerberos renews expires every 7 daysticket_lifetime = 24hrenew_lifeti

We have a long running app, the kerberos renews expires every 7 days

ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true

I have this class which is creating a UGICache at the app start time, and we are relogin the UGI every 8hrs by a background thread

forceReloginFromKeytab is there any issue using the API ?

case class UGIEntry(ugi: UserGroupInformation, principal: String)

/** Manages the caching and refreshing of UserGroupInformation (UGI) for HDFS access. */
object UGICache extends Logging {

  private val cache: TrieMap[String, UGIEntry] = TrieMap.empty

  /**
   * Normalize a given path by removing trailing slashes.
   *
   * @param path The HDFS path.
   * @return Normalized path.
   */
  private def normalizePath(path: String): String = path.replaceAll("/+$", "")

  /**
   * Pre-warm the UGI cache with a predefined list of principals and keytab paths, when the application starts.
   *
   * @param preWarmData A map of base HDFS path -> (keytabPath, principal).
   */
  def preWarmUGICache(preWarmData: Map[String, (String, String)]): Unit = {
    logger.info("Starting UGI cache pre-warming...")
    preWarmData.foreach { case (hdfsBasePath, (keytabPath, principal)) =>
      try {
        val normalizedPath = normalizePath(hdfsBasePath)
        val ugi = KerberosHadoopAuthenticator.getAuthenticatedUGI(None, KerberosHadoopAuthenticator.hadoopConfig)
        val ugiEntry = UGIEntry(ugi, principal)
        cache.put(normalizedPath, ugiEntry)
        logger.info(s"Pre-warmed UGI cache for HDFS Path: $normalizedPath, Principal: $principal")
      } catch {
        case ex: Exception =>
          logger.error(s"Failed to pre-warm UGI cache for HDFS Path: $hdfsBasePath, Principal: $principal. Error: ${ex.getMessage}")
      }
    }
    logger.info("UGI cache pre-warming completed.")
  }

  /**
   * Add a UGI to the cache dynamically.
   *
   * @param hdfsPath HDFS base path.
   * @param ugiEntry The UGI entry to cache.
   */
  def addUGI(hdfsPath: String, ugiEntry: UGIEntry): Unit = {
    val normalizedPath = normalizePath(hdfsPath)
    cache.put(normalizedPath, ugiEntry)
    logger.info(s"Added UGI for HDFS Path: $normalizedPath, Principal: ${ugiEntry.principal}")
  }

  /**
   * Retrieve a UGI from the cache.
   *
   * @param hdfsPath The HDFS path to look up.
   * @return The UGIEntry if found.
   */
  def getAuthenticatedUGI(hdfsPath: String): UGIEntry = {
    val normalizedPath = normalizePath(hdfsPath)
    cache.find { case (key, _) => normalizedPath.startsWith(key) } match {
      case Some((key, entry)) =>
        if (logger.isDebugEnabled) {
          logger.debug(s"Retrieved UGI for base path: $key (requested path: $normalizedPath)")
        }
        entry
      case None =>
        throw new IllegalArgumentException(s"No UGI found for HDFS Path: $hdfsPath")
    }
  }

  /**
   * Refresh (re-login) all UGIs in the cache.
   */
  def refreshAllUGIs(): Unit = {
    cache.foreach { case (hdfsPath, entry) =>
      try {
        entry.ugi.doAs(new PrivilegedExceptionAction[Unit] {
          override def run(): Unit = {
            UserGroupInformation.getCurrentUser.forceReloginFromKeytab()
            logger.info(s"Kerberos ticket force renewed for HDFS Path: $hdfsPath, Principal: ${entry.principal}")
          }
        })
      } catch {
        case ex: Exception =>
          logger.error(s"Failed to renew Kerberos ticket for HDFS Path: $hdfsPath, Principal: ${entry.principal}. Error: ${ex.getMessage}")
      }
    }
  }

}

I have to wait for 7 days to see if the logic is actually working

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1742347428a4426829.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信