SSIS ile Oracle fonksiyonundan veri çekmek

Bu yazım SSIS (SQL Server Integration Services) hakkındaki ilk yazım. Aslında uzun zamandır SSIS ile uğraşıyorum ve kenara köşeye aldığım bir çok notum var. Artık bu notlarımıda bu blogta baylaşmaya karar verdim. Bir çoğumuzun bildiği gibi SSIS Microsoft’un ETL aracıdır. ETL’in açılımı “Extract, Transform and Load” dur. Kısacası ETL herhangi bir kaynaktan bir verinin çekilmesi, gerekli düzenlemeler yapılarak kullanılır hale getirilmesi ve hedefe yüklenmesidir.

En son çalıştığım projede hedefimiz Oracle veritabanında birden çok tablodan veri çekmemiz ve bunu MS BizTalk’un kullanabileceği şekilde bir dosyaya yazmaktı. Bu hedef ilk başta kolay gözükse de işin içine girdiğimiz zaman Oracle’da hazırlanmış bir kaç Oracle fonksiyonu kullanarak bu verileri çekmek zorunda olduğumuzu fark ettik. Biraz araştırmadan sonra bu işi hazır SSIS komponentleri kullanarak kolaylıkla yapamayacağımızı gördük.

Problem, oracle veri bağlantısı kullanmamamıza rağmen, Oracle fonksiyonu refCursor tipinde bir veri geri döndürdüğü için SSIS bu tip bir objeyi okuyamıyor ve SQL tablo yapısına çeviremiyor. Attunity Oracle Adapter’de dahil denediğimiz bütün Oracle veri kaynakları bu tarz bir dönüşüm yapamadığını gördük. Çözüm olarak, SSIS Data Flow içersinde veri kaynağı olarak Script Task kullanarak C# ile gelen objeyi okuyup data flow pipeline’ına vermekti.

Genel olarak takip ettiğimiz adımlar şu şekilde:

1. Yeni bir Connection Manager ekliyoruz ve Oracle veritabanına bağlıyoruz. Adına OracleConnectionManager diyoruz.

2. Control Flow’a Data Flow task’ı ekliyoruz.

3. Data Flow’un içersine girdikten sonra, Script Component ekliyoruz.

4. Çıkan menüde Script Component’i kaynak (source) olarak seçiyoruz.

5. Script componentin özelliklerinden ilk olarak Output kolonlarını tanımlamamız gerekiyor, ben genellikle bütün kolonları String olarak tanımlıyorum. Daha sonra ihtiyacım olduğunda Data Conversion component ile  diğer veri tiplerine cevirim yapıyorum.

6. Script component’da output kolonlarımızı tanımladıktan sonra, Script Editor ile Oracle veritabanına bağlanıp ilgili fonksiyonu çağırdıktan sonra, sonucu output kolonlarına göndermek.

7. C# kodunu yazdıktan sonra scirpt componentini OLE DB Destination ile istediğimiz SQL Server tablosuna bağlıyoruz.

8. Paketi çalıştırdığımız zaman C# kodumuz Oracle fonksiyonundan  refCursor tipinde aldığı veriyi okuyarak OLE DB destionation’a gönderiyor. Bu sayede verimizi SQL Server tablosuna yamış oluyoruz.

Script Component’in içersini detaylı olarak incelemek gerekirse;

Bildiğiniz gibi zaten SSIS bizim için bütün methodları hazırlanmış bir class veriyor, bize kalan sadece ilgili event’ler içersine istediğimiz kodu yazmak.

ScriptMain classı içersinde ilk olarak SSIS de önceki adımlarda tanımladığımız Oracle Connection Manager’ı bağlıyoruz. Bu sayede veri bağlantılarımız değiştiği zaman kodumuzu değiştirmek zorunda kalmayacağız.

IDTSConnectionManager100 connMgr;
OracleConnection connection = new OracleConnection();
OracleCommand command = new OracleCommand();

Sıra AcquireConnections methoduna geldi; Bu metod otomatik olarak Script Component tarafından yaratılmıyor. Bunu elimizle yaratmamız gerekiyor. Eğer bu metodu nasıl ekleyeceğinizi bilmiyorsanız şu şekilde ekleniyor. Class’da boş bir yere yeni metod yazar gibi public override yazın ve boşluğa basın Visual Studio otomatik olarak UserComponent class’ından gelen ve kullanabileceğiniz bütün methodları sıralayacaktır oradan AcquireConnections metodunu seçin VS otomatik olarak methodu ekleyecektir. Veya direkt aşağıdaki method kopyalayabilirsiniz.

public override void AcquireConnections(object Transaction)
 {
 base.AcquireConnections(Transaction);
 }

Bu metodu yarattıktan sonra içini temizleyin ve içine şunları ekleyin

connMgr = this.Connections.OracleConnectionManager;
connection = (OracleConnection)connMgr.AcquireConnection(Transaction);

Burada dikkatinizi çekmek istediğim tek kısım OracleConnectionManager ismi sizin SSIS’de tanımladığınız ve Oracle’a bağlanmanıza yarayan Connection Manager’ın ismi. Yani eğer sizin Connection Manager’ınızın adı MyConnectionManager ile yukarıda ki satır şu şekilde olacaktı connMgr = this.Connections.MyConnectionManager. Böylece Connection Manager’ımızdan bağlantı bilgilerinide almış bulunuyoruz.

Sıra geldi PreExecute metoduna. Bu metod, Script Component task çalıştırılmadan önce çalışacak olan bir method o yüzden bütün değişken tanımlamalarımızı burada yapabiliriz. Bu arada bu tanımlamalrın hepsini aynı metodun içersinde de yapabilirdiniz fakat bu yol bence daha doğru. Sonuçta MS her event için metod hazırlamış niye kullanmayalım.

Bu metodun içersi şu şekilde,

command.Connection = connection;
command.CommandText = "<<FONKSIYON ISMI>>";
command.CommandType = CommandType.StoredProcedure;
command.Parameters.Add(new OracleParameter("ReturnValue", OracleType.Cursor)).Direction = ParameterDirection.ReturnValue;

Burada ilk 2 satırda yukarıda tanımladığımız bağlantıyı verdikten sonra çalıştırmak istediğimiz Oracle fonksiyonunun adını CommandText property’sine ekliyoruz. CommandType olarak enum değeri olan StoredProcedure seçiyoruz.

Daha sonra önemli olarak bir geri dönüş yani ReturnValue parametresi tanımlamamız gerekiyor, bunun nedeni fonksiyon çalıştıktan sonra refCursor tipindeki değeri yakalayabilmemiz.

Sıradan gittiğimizde önümüze PostExecute ve ReleaseConnections metodları çıkıyor. Bu metodlar üzerine çok fazla zaman harcamanın gereği yok. Kısacası PostExecute’da Script Component’in işlemi bittikten sonra yapılacakları yazıyoruz. ReleaseConnections metodu ile de bağlandığımız bağlantıları geri bırakıyoruz.

En son olarak CreateNewOutputRows metoduna geldik. Bu metodda AcquireConnections ve PreExecute metodu içersinde tanımladığımız bütün değerleri kullanmaya başlıyoruz. Aslında en yalın işi burada yapyoruz. Takip ettiğimiz adımlar şu şekilde;

1. OracleReader objesi yarat.

OracleDataReader reader;

2. Önceden tanımladığımız command’i command.ExecuteReader() ile çalıştır.

reader = command.ExecuteReader();

3.  ScriptComponent’dan çıkacak bir satır oluştur ve oluşan satıra command.ExecuteReader() ‘dan gelen sonuçları reader.Read() ile oku ve kolonlara yaz. Okuma bitene kadar bunu tekrarla.

while (reader.Read())
{
OutputBuffer.AddRow();
OutputBuffer.COL1 = reader[0].ToString();
OutputBuffer.COL2 = reader[1].ToString();
OutputBuffer.COL3 = reader[2].ToString();
OutputBuffer.COL4 = reader[3].ToString();
OutputBuffer.COL5 = reader[4].ToString();
OutputBuffer.COL6 = reader[5].ToString();
OutputBuffer.COL7 = reader[6].ToString();
OutputBuffer.COL8 = reader[7].ToString();
OutputBuffer.COL9 = reader[8].ToString();
OutputBuffer.COL10 = reader[9].ToString();
OutputBuffer.COL11 = reader[10].ToString();
}

4. Bağlantıları kapat.

Bu metoduda yazarak bütün kodumuzu bitirmiş oluyoruz. Artık bundan sonra hazırladığımız bu Script Component’dan çıkan veriyi istediğiniz şekilde üzerinde değişiklik yapabilir ve istediğiniz yere yazabilirsiniz. Gerisi size kalıyor. Umarım yazım işinize yarar, yarumlarınızı bekliyorum.

————————————————————————————-

Tüm C# Kodu şu şekilde:

/* Microsoft SQL Server Integration Services Script Component
* Write scripts using Microsoft Visual C# 2008.
* ScriptMain is the entry point class of the script.*/
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using System.Data.OracleClient;
using System.Windows.Forms;
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
IDTSConnectionManager100 connMgr;
OracleConnection connection = new OracleConnection();
OracleCommand command = new OracleCommand();
public override void AcquireConnections(object Transaction)
{
connMgr = this.Connections.OracleConnectionManager;
connection = (OracleConnection)connMgr.AcquireConnection(Transaction);
}
public override void PreExecute()
{
command.Connection = connection;
command.CommandText = "<<FONKSIYON ISMI>>";
command.CommandType = CommandType.StoredProcedure;
command.Parameters.Add(new OracleParameter("ReturnValue", OracleType.Cursor)).Direction = ParameterDirection.ReturnValue;
}
public override void PostExecute()
{
//base.PostExecute();
command.Dispose();
}
public override void ReleaseConnections()
{
//base.ReleaseConnections();
connMgr.ReleaseConnection(connection);
}
public override void CreateNewOutputRows()
{
try
{
OracleDataReader reader;
reader = command.ExecuteReader();
while (reader.Read())
{
OutputBuffer.AddRow();
OutputBuffer.COL1 = reader[0].ToString();
OutputBuffer.COL2 = reader[1].ToString();
OutputBuffer.COL3 = reader[2].ToString();
OutputBuffer.COL4 = reader[3].ToString();
OutputBuffer.COL5 = reader[4].ToString();
OutputBuffer.COL6 = reader[5].ToString();
OutputBuffer.COL7 = reader[6].ToString();
OutputBuffer.COL8 = reader[7].ToString();
OutputBuffer.COL9 = reader[8].ToString();
OutputBuffer.COL10 = reader[9].ToString();
OutputBuffer.COL11 = reader[10].ToString();
}
reader.Close();
connection.Close();
}
catch (Exception e)
{
//MessageBox.Show(e.Message);
}
OutputBuffer.SetEndOfRowset();
}
}
Reklamlar

Bir Cevap Yazın

Aşağıya bilgilerinizi girin veya oturum açmak için bir simgeye tıklayın:

WordPress.com Logosu

WordPress.com hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap / Değiştir )

Twitter resmi

Twitter hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap / Değiştir )

Facebook fotoğrafı

Facebook hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap / Değiştir )

Google+ fotoğrafı

Google+ hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap / Değiştir )

Connecting to %s