data.table의 rolling join

data.table에서 제공하는 inequality join method 중 하나인 rolling merge에 대해 알아보자

R
data.table
Author
Published

December 13, 2024

1. Introduction: Data.table

data.table은 대용량의 데이터를 처리하는 데 있어 빠른 속도와 메모리 효율을 보여주는 패키지이다. 또한 dplyr와 비교하여, dplyr에서 지원하지 않는 기능도 가지고 있는데, 그 중 하나인 rolling join을 소개한다.

1. join

일반적으로 join이라 함은 원하는 재료집합이 2개 이상일때, 이를 인결하여 새로운 집합을 만드는 연산을 의미한다. 당연히 join 연산을 할 때마다 집합이 확장되며 컬럼의 수가 증가하게 된다. join 연산의 경우 일반적으로는 equality condition을 사용한다. 즉 사용되는 집합이 일치하는 경우에만 연산이 이루어진다. 보통 이러한 조건 때문에 inner join, outter join 등을 사용하게 된다.

2. rolling join

rolling join은 inequality condition을 사용한다. 병합의 기준이 되는 컬럼 내에서 값을 탐색할 때, 다음과 같은 단계를 따를 수 있다.

① 일단 일치하는 값이 있는지 확인하고, 없으면 선택한 방향을 따라 탐색한다. ② 탐색 범위에 기준 값에 가장 가까운 값이 있으면 그 값이 존재하는 행과 merge를 실행한다. ③ 탐색 범위 내에 값이 존재하지 않으면 병합을 실행하지 않는다.

일반적으로 두 data.table object를 병합할 때에는 다음과 같이 실행할 수 있다. 여기서 on = 을 활용해서 기준이 되는 컬럼을 지정할 수 있다.

result_dt <- dt1[dt2, on = .(key_column1, key_column2)]

그러나 rolling join을 실행하고 싶은 경우, 다음과 같이 작성할 수 있다.

result_dt <- dt1[dt2, on = .(key_column1, key_column2), roll = Inf] # roll option = c(Inf, -Inf, number, "nearest")

2. Data.table의 roll merge option

1. roll = 옵션

아래는 실습에 사용할 데이터를 전처리하는 코드이다. 실습 데이터는 다음 링크에서 다운받을 수 있으며, 4개 파일을 사용하여 실습을 진행할 것이다.

library(data.table);library(magrittr)

bnc <- fread("data/nsc2_bnc_1000.csv") 
bnd <- fread("data/nsc2_bnd_1000.csv")[, Deathdate := (lubridate::ym(DTH_YYYYMM) %>% lubridate::ceiling_date(unit = "month") - 1)][]
m20 <- fread("data/nsc2_m20_1000.csv") 
m40 <- fread("data/nsc2_m40_1000.csv")[SICK_CLSF_TYPE %in% c(1, 2, NA)] 

code.HTN <- paste(paste0("I", 10:15), collapse = "|")
data.start <- m20[like(SICK_SYM1, code.HTN) & (MDCARE_STRT_DT >= 20060101), .(Indexdate = min(MDCARE_STRT_DT)), keyby = "RN_INDI"]

## Previous disease: Among all sick code
excl <- m40[(MCEX_SICK_SYM %like% code.HTN) & (MDCARE_STRT_DT < 20060101), .SD[1], .SDcols = c("MDCARE_STRT_DT"), keyby = "RN_INDI"]

## Merge: left anti join
data.incl <- data.start[!excl, on = "RN_INDI"][, Indexdate := as.Date(as.character(Indexdate), format = "%Y%m%d")][]
data.asd <- merge(bnd, bnc[, .(SEX = SEX[1]), keyby = "RN_INDI"], by = "RN_INDI") %>% 
  merge(data.incl, by = "RN_INDI") %>% 
  .[, `:=`(Age = year(Indexdate) - as.integer(substr(BTH_YYYY, 1, 4)),
           Death = as.integer(!is.na(DTH_YYYYMM)),
           Day_FU = as.integer(pmin(as.Date("2015-12-31"), Deathdate, na.rm =T) - Indexdate))] %>% .[, -c("BTH_YYYY", "DTH_YYYYMM", "Deathdate")] 
code.cci <- list(
  MI = c("I21", "I22", "I252"),
  CHF = c(paste0("I", c("099", 110, 130, 132, 255, 420, 425:429, 43, 50)), "P290"),
  Peripheral_VD = c(paste0("I", 70, 71, 731, 738, 739, 771, 790, 792), paste0("K", c(551, 558, 559)), "Z958", "Z959"),
  Cerebro_VD = c("G45", "G46", "H340", paste0("I", 60:69)),
  Dementia = c(paste0("F0", c(0:3, 51)), "G30", "G311"),
  Chronic_pulmonary_dz = c("I278", "I279", paste0("J", c(40:47, 60:67, 684, 701, 703))),
  Rheumatologic_dz = paste0("M", c("05", "06", 315, 32:34, 351, 353, 360)),
  Peptic_ulcer_dz = paste0("K", 25:28),
  Mild_liver_dz = c("B18", paste0("K", c(700:703, 709, 713:715, 717, 73, 74, 760, 762:764, 768, 769)), "Z944"),
  DM_no_complication = paste0("E", c(100, 101, 106, 108:111, 116, 118:121, 126, 128:131, 136, 138:141, 146, 148, 149)),
  DM_complication = paste0("E", c(102:105, 107, 112:115, 117, 122:125, 127, 132:135, 137, 142:145, 147)),
  Hemi_paraplegia = paste0("G", c("041", 114, 801, 802, 81, 82, 830:834, 839)),
  Renal_dz = c("I120", "I131", paste0("N", c("032", "033", "034", "035", "036", "037", "052", "053", "054", "055", "056", "057",
                                             18, 19, 250)), paste0("Z", c(490:492, 940, 992))),
  Malig_with_Leuk_lymphoma = paste0("C", c(paste0("0", 0:9), 10:26, 30:34, 37:41, 43, 45:58, 60:76, 81:85, 88, 90, 97)),
  Moderate_severe_liver_dz = c(paste0("I", c(85, 859, 864, 982)), paste0("K", c(704, 711, 721, 729, 765:767))),
  Metastatic_solid_tumor = paste0("C", 77:80),
  AIDS_HIV = paste0("B", c(20:22, 24))
)
cciscore <- c(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 3, 6, 6, 2)
names(cciscore) <- names(code.cci)

지금부터는 실제 의료 데이터를 활용하여 roll merge가 사용될 수 있는 상황에 대해 알아보고, 이를 코드로 적용해 보겠다. 데이터는 성균관대학교 바이오헬스규제학과 강의에 사용된 건강보험공단 데이터를 사용하였다. 일단 roll merge를 시행할 데이터에 대해 알아보자.

roll = Inf

[1] "병력 진단 기준 날짜(Indexdate)가 있는 data.asd 데이터"
Key: <RN_INDI>
   RN_INDI   COD1   COD2   SEX  Indexdate   Age Death Day_FU
     <int> <char> <char> <int>     <Date> <int> <int>  <int>
1:   13546                   1 2006-08-08    45     0   3432
2:   23682                   1 2008-09-22    53     0   2656
3:   36714                   2 2010-01-19    64     0   2172
4:   39217                   1 2013-04-02    56     0   1003
5:   46621                   1 2011-03-24    51     0   1743
6:   51049                   1 2006-11-23    21     0   3325
[1] "병력 진단 날짜(MDCARE_STRT_DT)가 있는 m40 데이터"
   RN_INDI        RN_KEY MDCARE_STRT_DT FORM_CD MCEX_SICK_SYM
     <int>         <i64>          <int>   <int>        <char>
1:  596535 2002120187152       20021202       3          J209
2:  615374 2002121012274       20021202       3          J209
3: 1005547 2002120808216       20021202       3          J209
4:  226594 2002120381612       20021202       3          J209
5:  204930 2002120790182       20021202       3          J209
6:  798943 2002040446183       20020401       3          J209
   DETAIL_TMSG_SUBJ_CD SICK_CLSF_TYPE STD_YYYY
                <char>          <int>    <int>
1:                                 NA     2002
2:                                 NA     2002
3:                                 NA     2002
4:                                 NA     2002
5:                                 NA     2002
6:                                 NA     2002

첫 번째 데이터는 환자별로 병력 진단 기준일이 되는 날짜가 적혀 있다. 그리고 두 번째 데이터는 환자가 병력 진단을 받았을 경우 해당 날짜가 적혀 있다. 우리는 이 데이터를 가지고, 나름의 기준을 세워서 두 데이터를 병합하는 것이 목적이다. 만약 ’첫 번째 데이터의 Indexdate를 기준으로 그 이전의 모든 날짜에서 진단일이 한 번이라도 있으면 병력이 존재하는 것으로 간주’하려면 어떻게 해야 할까?

병합을 하는 기준이 비교하는 두 날짜가 완벽히 일치하는 것이기 아니기 때문에 일반적인 join method를 사용할 수 없다. 이러한 경우에 roll merge를 사용할 수 있다.

info.cci <- lapply(names(code.cci), function(x){
  data.asd[, MDCARE_STRT_DT := Indexdate]
  dt <- m40[like(MCEX_SICK_SYM, paste(code.cci[[x]], collapse = "|"))][, MDCARE_STRT_DT := as.Date(as.character(MDCARE_STRT_DT), format = "%Y%m%d")][, .(RN_INDI, MDCARE_STRT_DT, Incidate = MDCARE_STRT_DT)]  
  dt[, .SD[1], keyby = c("RN_INDI", "MDCARE_STRT_DT")][data.asd, on = c("RN_INDI", "MDCARE_STRT_DT"), roll = Inf]})

print(head(info.cci[[10]], n = 5))
   RN_INDI MDCARE_STRT_DT   Incidate   COD1   COD2   SEX  Indexdate   Age Death
     <int>         <Date>     <Date> <char> <char> <int>     <Date> <int> <int>
1:   13546     2006-08-08       <NA>                   1 2006-08-08    45     0
2:   23682     2008-09-22       <NA>                   1 2008-09-22    53     0
3:   36714     2010-01-19 2009-12-26                   2 2010-01-19    64     0
4:   39217     2013-04-02 2013-04-02                   1 2013-04-02    56     0
5:   46621     2011-03-24 2011-02-16                   1 2011-03-24    51     0
   Day_FU
    <int>
1:   3432
2:   2656
3:   2172
4:   1003
5:   1743

주의할 점은 data.table의 on=으로 병합을 시도할 경우에는 대상 컬럼이 이름이 같아야 한다는 것이다. 위의 코드에서는 Indexdate, incidate와 값이 같은 MDCARE_STRT_DT 컬럼을 대상으로 roll merge를 시도하였다.

roll = Inf로 옵션을 주었기 때문에 dt의 날짜를 기준으로 data.asd를 연결할 때 정확히 일치하는 날짜가 없다면 data.asd의 날짜를 뒤로 밀어서 일치하는 날짜를 찾는다. 결과적으로 data.asd의 Indexdate 기준으로 앞 날짜에 m40의 Incidate가 존재한다면 병합이 되는 로직이라고 할 수 있다.

roll merge를 사용하지 않는다면 cartesian = T 옵션을 사용하여 모든 가능한 조합을 허용하여 merge하고 그후에 조건에 맞게 필터링하는 과정을 거쳐야 한다. 다음은 rolling join을 사용하지 않은 코드의 예시이다.

info.cci <- lapply(names(code.cci), function(x){
  result <- merge(data.asd[, .(RN_INDI, Indexdate)],
                  m40[like(MCEX_SICK_SYM, paste(code.cci[[x]], collapse = "|"))][order(MDCARE_STRT_DT), .SD[1], keyby = "RN_INDI"][, .(RN_INDI, Incidate = as.Date(as.character(MDCARE_STRT_DT), format = "%Y%m%d"))],
                  by = "RN_INDI", all.x = T)
  result[Indexdate < Incidate, Incidate := NA]
  return(result)
})
print(head(info.cci[[10]], n = 5))
Key: <RN_INDI>
   RN_INDI  Indexdate   Incidate
     <int>     <Date>     <Date>
1:   13546 2006-08-08       <NA>
2:   23682 2008-09-22       <NA>
3:   36714 2010-01-19 2005-12-09
4:   39217 2013-04-02 2007-11-08
5:   46621 2011-03-24 2005-09-20

일단 merge를 수행한 이후, Incidate가 가장 빠른 첫 번째 날짜를 채택하여, 그 날짜가 Indexdate보다 뒤에 있을 경우 NA로 바꾸는 방식으로 필터링하였다. 이렇게 작업하여도 그 이전의 모든 날짜에서 진단일이 한 번이라도 있으면이라는 조건을 만족하는 행을 필터링하는 데에는 문제가 없지만 로직의 차이로 인해 Incidate에 적힌 날짜가 다른 것을 알 수 있다. 또한 필터링 과정 때문에 코드가 길어져 가독성이 좋지 않다.

roll = -Inf

[1] "병력 진단 기준 날짜(Indexdate)가 있는 data.asd 데이터"
Key: <RN_INDI>
   RN_INDI   COD1   COD2   SEX  Indexdate   Age Death Day_FU MDCARE_STRT_DT
     <int> <char> <char> <int>     <Date> <int> <int>  <int>         <Date>
1:   13546                   1 2006-08-08    45     0   3432     2006-08-08
2:   23682                   1 2008-09-22    53     0   2656     2008-09-22
3:   36714                   2 2010-01-19    64     0   2172     2010-01-19
4:   39217                   1 2013-04-02    56     0   1003     2013-04-02
5:   46621                   1 2011-03-24    51     0   1743     2011-03-24
6:   51049                   1 2006-11-23    21     0   3325     2006-11-23
[1] "병력 진단 날짜(MDCARE_STRT_DT)가 있는 m40 데이터"
   RN_INDI        RN_KEY MDCARE_STRT_DT FORM_CD MCEX_SICK_SYM
     <int>         <i64>          <int>   <int>        <char>
1:  596535 2002120187152       20021202       3          J209
2:  615374 2002121012274       20021202       3          J209
3: 1005547 2002120808216       20021202       3          J209
4:  226594 2002120381612       20021202       3          J209
5:  204930 2002120790182       20021202       3          J209
6:  798943 2002040446183       20020401       3          J209
   DETAIL_TMSG_SUBJ_CD SICK_CLSF_TYPE STD_YYYY
                <char>          <int>    <int>
1:                                 NA     2002
2:                                 NA     2002
3:                                 NA     2002
4:                                 NA     2002
5:                                 NA     2002
6:                                 NA     2002

두 번째 예시는 동일한 데이터를 활용할 것이지만, 이번에는 ’첫 번째 데이터의 Indexdate를 기준으로 이후의 모든 날짜에서 발병 기록이 한 번이라도 있으면 발병한 것으로 간주’하려면 어떻게 해야 할까?

data.asd[, MDCARE_STRT_DT := Indexdate]
info.MI <- m40 %>% 
  .[like(MCEX_SICK_SYM, paste(code.cci[["MI"]], collapse = "|")), .(RN_INDI, MDCARE_STRT_DT = as.Date(as.character(MDCARE_STRT_DT), format = "%Y%m%d"), MIdate = as.Date(as.character(MDCARE_STRT_DT), format = "%Y%m%d"))] %>%
  .[data.asd, on = c("RN_INDI", "MDCARE_STRT_DT"), roll = -Inf] 

print(info.MI[40:50])
    RN_INDI MDCARE_STRT_DT     MIdate   COD1   COD2   SEX  Indexdate   Age
      <int>         <Date>     <Date> <char> <char> <int>     <Date> <int>
 1:  484978     2006-02-04 2006-02-04                   2 2006-02-04    63
 2:  505228     2013-04-02       <NA>                   2 2013-04-02    45
 3:  517447     2011-10-05       <NA>                   2 2011-10-05    61
 4:  518792     2011-01-09       <NA>                   1 2011-01-09    50
 5:  529690     2013-01-09       <NA>                   2 2013-01-09    58
 6:  530990     2015-11-25       <NA>                   1 2015-11-25    63
 7:  540586     2008-06-03       <NA>                   1 2008-06-03    59
 8:  546772     2006-07-24       <NA>                   2 2006-07-24    19
 9:  551252     2008-05-20       <NA>                   2 2008-05-20    73
10:  559420     2014-12-22       <NA>                   2 2014-12-22    52
11:  562142     2013-03-27 2013-03-27                   1 2013-03-27    53
    Death Day_FU
    <int>  <int>
 1:     0   3617
 2:     0   1003
 3:     0   1548
 4:     0   1817
 5:     0   1086
 6:     0     36
 7:     0   2767
 8:     0   3447
 9:     0   2781
10:     0    374
11:     0   1009

roll = -Inf로 옵션을 주었기 때문에 m40의 날짜를 기준으로 data.asd를 연결할 때 정확히 일치하는 날짜가 없다면 data.asd의 날짜를 앞으로 당겨서 일치하는 날짜를 찾는다. 결과적으로 data.asd의 Indexdate 기준으로 뒤 날짜에 m40의 Incidate가 존재한다면 병합이 되는 로직이라고 할 수 있다. (여기서는 시작 날짜도 포함하였다.)

roll merge를 사용하지 않는다면 cartesian = T 옵션을 사용하여 모든 가능한 조합을 허용하여 merge하고 그후에 조건에 맞게 필터링하는 과정을 거쳐야 한다. 다음은 rolling join을 사용하지 않은 코드의 예시이다.

info.MI <- merge(data.asd[, .(RN_INDI, Indexdate)],
                 m40[like(MCEX_SICK_SYM, paste(code.cci[["MI"]], collapse = "|"))][order(MDCARE_STRT_DT), .SD[1], keyby = "RN_INDI"][, .(RN_INDI, MIdate = as.Date(as.character(MDCARE_STRT_DT), format = "%Y%m%d"))],
                 by = "RN_INDI", all.x = T) %>%
  .[Indexdate < MIdate, MIdate := NA]

print(info.MI[40:50])
Key: <RN_INDI>
    RN_INDI  Indexdate     MIdate
      <int>     <Date>     <Date>
 1:  484978 2006-02-04 2006-02-04
 2:  505228 2013-04-02       <NA>
 3:  517447 2011-10-05       <NA>
 4:  518792 2011-01-09       <NA>
 5:  529690 2013-01-09       <NA>
 6:  530990 2015-11-25       <NA>
 7:  540586 2008-06-03       <NA>
 8:  546772 2006-07-24       <NA>
 9:  551252 2008-05-20       <NA>
10:  559420 2014-12-22       <NA>
11:  562142 2013-03-27 2013-03-27

일단 merge를 수행한 이후, MIdate의 첫 번째 날짜를 채택하여, 그 날짜가 Indexdate보다 뒤에 있을 경우 NA로 바꾸는 방식으로 필터링하였다. 이렇게 작업하여도 그 이전의 모든 날짜에서 진단일이 한 번이라도 있으면이라는 조건을 만족하는 행을 필터링하는 데에는 문제가 없지만 로직의 차이로 인해 MIdate에 적힌 날짜가 다른 것을 알 수 있다. 또한 필터링 과정 때문에 코드가 길어져 가독성이 좋지 않다.

정리하면 result_dt <- dt1[dt2, on = .(key_column1, key_column2), roll = Inf]에서 roll의 방향은 Inf 옵션일 때에는 dt1을 기준으로 dt2의 컬럼 값을 더 큰값으로 바꾸며 탐색하며, -Inf 옵션일 때에는 dt1을 기준으로 dt2의 컬럼 값을 더 작은 값으로 바꾸며 탐색한다. roll = 옵션에는 숫자도 줄 수 있는데, 이 경우 정해준 컬럼 값 기준 숫자 범위 내에서만 탐색한다. roll = nearest 옵션에서는 양방향 탐색을 진행하되, 가장 가까운 값을 찾아서 merge를 시도한다.

2. rollends = 옵션

rollends = 옵션을 활용하여 rolling을 시작한 경계와 끝 경계에서 어떤 동작을 취할 지 지정할 수 있다. 아래 코드 예시와 같이, rollends 옵션은 두 개의 boolean 값을 가진다. 첫 번째 index는 rolling 시작 경계값에 대한 처리이며, 두 번째 index는 rolling 끝 경계값에 대한 처리이다.

result_dt <- dt1[dt2, on = .(id), roll = Inf, rollends = c(TRUE, TRUE)]

즉 양 끝의 값을 포함할 것인지, 버리고 NA를 취할 것인지에 대한 조정이라고 보면 된다. rollends =옵션이 Inf일 경우 (T, F), -Inf일 경우 (F, T), 숫자일 경우 (F, F)이다. 이전에 사용했던 건강보험공단 데이터로 실행해보면서 알아보자.

info.cci <- lapply(names(code.cci), function(x){
  data.asd[, MDCARE_STRT_DT := Indexdate]
  dt <- m40[like(MCEX_SICK_SYM, paste(code.cci[[x]], collapse = "|"))][, MDCARE_STRT_DT := as.Date(as.character(MDCARE_STRT_DT), format = "%Y%m%d")][, .(RN_INDI, MDCARE_STRT_DT, Incidate = MDCARE_STRT_DT)]  
  dt[, .SD[1], keyby = c("RN_INDI", "MDCARE_STRT_DT")][data.asd, on = c("RN_INDI", "MDCARE_STRT_DT"), roll = Inf, rollends = c(T, F)]})

print(head(info.cci[[10]], n = 5))
   RN_INDI MDCARE_STRT_DT   Incidate   COD1   COD2   SEX  Indexdate   Age Death
     <int>         <Date>     <Date> <char> <char> <int>     <Date> <int> <int>
1:   13546     2006-08-08 2007-03-12                   1 2006-08-08    45     0
2:   23682     2008-09-22       <NA>                   1 2008-09-22    53     0
3:   36714     2010-01-19 2009-12-26                   2 2010-01-19    64     0
4:   39217     2013-04-02 2013-04-02                   1 2013-04-02    56     0
5:   46621     2011-03-24 2011-02-16                   1 2011-03-24    51     0
   Day_FU
    <int>
1:   3432
2:   2656
3:   2172
4:   1003
5:   1743

roll = Inf이므로 Incidate 기준으로 Indexdate는 Incidate보다 큰 값이 탐지되어야 merge를 할 수 있다. 하지만 rollends = (T, F) 로 되어 있으므로 첫 번째 행에서 시작 바운더리인 기준값보다 작아도 roll merge가 일어난 모습을 볼 수 있다.

3. Conclusion

지금까지 건강보험공단 데이터를 활용하여 rolling join이 무엇이고, 왜 사용해야 하며, 어떤 상황에서 사용할 수 있는지 알아보았다. rolling join을 사용하지 않으면 cartesian=T옵션을 사용하여 모든 join의 수를 다 계산한 뒤 목적에 맞게 필터링을 해야 하는 추가 작업을 진행해야 했다. 하지만 rolling join 옵션을 사용하면 연구자의 목적에 맞게 inequality join을 간편하게 사용할 수 있다.

  • rolling join은 inequality join의 일종으로, 기준 값이 완벽하게 일치하지 않아도 주어진 목적에 맞게 두 집합을 병합할 수 있게 해 준다.
  • rolling join의 옵션은 바깥 dt를 기준으로 하여 Inf(순방향), -Inf(역방향), number(숫자 범위), 'nearest'(방향무관)이 있다.
  • rollends = 옵션을 사용하여 경계에서의 roll 여부를 결정할 수 있다.

Reuse

Citation

BibTeX citation:
@online{seo2024,
  author = {Seo, Jeongmin},
  title = {Data.table의 Rolling Join},
  date = {2024-12-13},
  url = {https://blog.zarathu.com/posts/2024-12-13-rollmerge/},
  langid = {en}
}
For attribution, please cite this work as:
Seo, Jeongmin. 2024. “Data.table의 Rolling Join.” December 13, 2024. https://blog.zarathu.com/posts/2024-12-13-rollmerge/.